StreamCruncher - API and Configuration
Here is a list of classes provided by StreamCruncher as part of the API:
streamcruncher.api.StreamCruncher streamcruncher.api.StreamCruncherException streamcruncher.api.StartupShutdownHook streamcruncher.api.InputSession streamcruncher.api.OutputSession streamcruncher.api.ParsedQuery streamcruncher.api.ParserParameters streamcruncher.api.QueryConfig streamcruncher.api.QueryConfig.QuerySchedulePolicy streamcruncher.api.ResultSetCacheConfig streamcruncher.api.TimeWindowSizeProvider streamcruncher.api.WindowSizeProvider streamcruncher.api.DBName streamcruncher.api.artifact.RowSpec
The JavaDocs can also be referred which are provided in the downloadable package. Test Cases are provided which also serve as demonstrations of the features.
streamcruncher.api.StreamCruncher
is the main class. The Input and Output Event Streams are accessed using streamcruncher.api.InputSession
and streamcruncher.api.OutputSession
, respectively.
The "Running Query" is wrapped and handled using streamcruncher.api.ParsedQuery
and streamcruncher.api.ParserParameters
.
The classes in the streamcruncher.api.artifact
package should be used to provide hints to the Kernel about the SQL/RDBMS equivalent data types of the Events' properties.
Starting StreamCruncher (and stopping)
java -Dsc.config.file=.\resources\sc_config_mysql.properties com.myproduct.CruncherClient .. public class CruncherClient{ public static void main(String[] args){ String prop = System.getProperty("sc.config.file"); StreamCruncher cruncher = new StreamCruncher(); cruncher.start(prop); //Option 1: //---------- //Spawn a Thread to perform Event registration, //handling and other real work. .. .. //Main Thread just waits for Console message. cruncher.keepRunning(); //Option 2: //---------- //Perform Event registration, handling and other //real work. .. .. //Stops the Kernel. cruncher.stop(); } .. .. }
StreamCruncher supports a few Database products that can be used as the underlying datastore. These are configured in a Properties file that has to be provided to the Kernel during startup through the API.
Registering an Input Event Stream
String[] columnNames = { "country", "state", "city", "item_sku", "item_qty", "order_time", "order_id" }; //DB specific. MySQL shown here. String[] columnTypes = { java.lang.String.class.getName() + ":SIZE=15", java.lang.String.class.getName() + ":SIZE=15", java.lang.String.class.getName() + ":SIZE=15", java.lang.String.class.getName() + ":SIZE=15", java.lang.Integer.class.getName(), java.sql.Timestamp.class.getName(), java.lang.Long.class.getName() }; //6 - Id column position. 5 - Timestamp column position. RowSpec rowSpec = new RowSpec(columnNames, columnTypes, 6, 5); String inputStreamName = "test"; cruncher.registerInStream(inputStreamName, rowSpec);
Registering the "Running Query"
String rql = "select country, state, city, item_sku, item_qty, order_time, order_id from test (partition by item_sku store last 5 seconds max 5) as testStr where testStr.$row_status is not dead"; //DB specific. MySQL shown here. String[] resultColumnTypes = {java.lang.String.class.getName() + ":SIZE=15", java.lang.String.class.getName() + ":SIZE=15", java.lang.String.class.getName() + ":SIZE=15", java.lang.String.class.getName() + ":SIZE=15", java.lang.Integer.class.getName(), java.sql.Timestamp.class.getName(), java.lang.Long.class.getName()}; String queryName = "test_res_rql"; ParserParameters parameters = new ParserParameters(); parameters.setQuery(rql); parameters.setQueryName(queryName); parameters.setResultColumnTypes(resultColumnTypes); ParsedQuery parsedQuery = cruncher.parseQuery(parameters); QueryConfig config = parsedQuery.getQueryConfig(); //Processing policy. config.setQuerySchedulePolicy(new QueryConfig.QuerySchedulePolicyValue( QuerySchedulePolicy.ATLEAST_OR_SOONER, 5000)); cruncher.registerQuery(parsedQuery);
Inserting Events
It must be noted that the property of the Event that is used as the Id must always be unique and must ideally, increase monotonically.
InputSession inputSession = cruncher.createInputSession("test"); inputSession.start(); for(; /* Run in a loop, to insert multiple Events */ ;) { //Create the Event. Object[] event = .... //Send the Events to StreamCruncher. inputSession.submitEvent(event); } inputSession.close();
Retrieving the generated/Output Stream Events
It is recommended that Input and Output processing be handled by separate Threads to maximize throughput.
String queryName = "test_res_rql"; OutputSession outputSession = cruncher.createOutputSession(queryName); outputSession.start(); for(; /* Keep polling until more results are expected */ ;) { List<Object[]> events = outputSession.readEvents(10, TimeUnit.SECONDS); if (events.size() == 0) { continue; } .. .. System.out.println("Got few Output Events"); System.out.println("Retrying for more results..."); } outputSession.close();
Customization
Custom Aggregate Functions can be created and plugged into the Kernel. It involves extending 2 Classes - streamcruncher.api.aggregator.AbstractAggregator
and streamcruncher.api.aggregator.AbstractAggregatorHelper
.
The Query below uses a custom Function called test_fn
select country, sum_qty, test_fn_val from test (partition by country store last 5 seconds with sum(item_qty) as sum_qty, custom(test_fn, order_id, J) as test_fn_val) as testStr where testStr.$row_status is new;
The first element within the brackets of the custom(..)
keyword is the name of the Function under which it is registered with the Kernel. It can take any number of comma-separated elements after that, which get passed on to the Function at run-time.
The example Function has a return type of varchar(2)
, which is indicated by the Helper class.
public class TestAggregatorHelper extends AbstractAggregatorHelper { public TestAggregatorHelper() { super("test_fn", TestAggregator.class); } @Override public String getAggregatedColumnDDLFragment(DBName dbName, String[] params, LinkedHashMap<String, String> columnNamesAndTypes) throws Exception { return "varchar(2)"; } }
TestAggregator
shown here, is configured in the Query to use the values in the order_id
column. Obviously meant for demonstration purposes, the example is rather contrived. It uses the integer part of the column values and then divides it with 26 and converts the resulting reminder into an ASCII character. This is then concatentaned with the "J" that is supplied in the Query. Thus the Function returns a 2 character String by peforming some calculation on the values in the Window.
The Kernel can create any number of instances of the Function. Instances should not be shared by way of Singletons etc. Each instance is initialized by the Kernel and provides the instance with all the Column names and their Database types present in the Stream. TestAggregator
uses this to locate the position of the order_id
column it has been configured to use.
The instance is invoked if the Window contents change, by the end of every Query execution. It is invoked with column values of Events that got expelled from the Window as well as the ones that were added in the current run.
public class TestAggregator extends AbstractAggregator { private int columnPosition; private int sumOfOrderIds; private char suffix; @Override public void init(String[] params, LinkedHashMap<String, String> columnNamesAndTypes, AggregationStage aggregationStage) { super.init(params, columnNamesAndTypes, aggregationStage); for (String column : columnNamesAndTypes.keySet()) { if (params[0].equalsIgnoreCase(column)) { break; } columnPosition++; } suffix = params[1].charAt(0); } @Override public String aggregate(List<Object[]> removedValues, List<Object[]> addedValues) { if (removedValues != null && getAggregationStage() != AggregationStage.ENTRANCE) { for (Object[] objects : removedValues) { Object object = objects[columnPosition]; // Consider only non-nulls. if (object != null && object instanceof Long) { Long l = (Long) object; sumOfOrderIds = sumOfOrderIds - l.intValue(); } } } if (addedValues != null) { for (Object[] objects : addedValues) { Object object = objects[columnPosition]; // Consider only non-nulls. if (object != null && object instanceof Long) { Long l = (Long) object; sumOfOrderIds = sumOfOrderIds + l.intValue(); } } } int i = (sumOfOrderIds % 26) + 65; char c = (char) i; return new String(new char[] { c, suffix }); } }
Before the Query that uses the custom Function is registered, the Helper class must be registered with the Kernel.
StreamCruncher provides a variety of Aggregate Functions. In some cases, the change (Difference) in the value of the Aggregate is more useful than the Aggregate itself. For most in-built Aggregates other than the max
and min
Functions, an additional directive called $diff
, when specified tells the Function to spit out the difference between the current value and the previous aggregate value. In some cases, this simple difference will not suffice. For example in some situations like in Temperature control systems, it is more useful to use a highest value that is considered safe and use that as a baseline against which the latest aggregate is compared. Or the highest value in the last 1 hour for Stock value changes. StreamCruncher allows custom Baseline Providers to be plugged in.
select columns.. from ping_event (partition by levela, levelb store last 5 seconds with pinned count(event_id $diff) as change, count(event_id $diff 'DiffBaseline/CountBL') as change2, count(event_id) as plain_count) as heartbeat where ..;
Each Baseline Provider must be registered with the Kernel before the Query is registered.
cruncher.registerProvider("DiffBaseline/CountBL", CountDiffBaselineProvider.class); cruncher.registerProvider("DiffBaseline/AvgBL", StatsDiffBaselineProvider.class);
public class CountDiffBaselineProvider extends DiffBaselineProvider<Integer> { public CountDiffBaselineProvider() { super(); System.out.println("Using CountDiffBaselineProvider"); } /** * @return Fixed Baseline. Always Zero. */ @Override public Integer getBaseline(Integer oldValue, Integer newValue) { return 0; } } public class StatsDiffBaselineProvider extends DiffBaselineProvider<Double> { public StatsDiffBaselineProvider() { super(); System.out.println("Using StatsDiffBaselineProvider"); } /** * @return Fixed Baseline. Always Zero. */ @Override public Double getBaseline(Double oldValue, Double newValue) { return 0.0; } }
Baseline Providers must extend the streamcruncher.api.aggregator.DiffBaselineProvider
. Baseline Providers for the count
function must be based on the java.lang.Integer
type. Similarly, java.lang.Double
for other Statistical functions.
StreamCruncher allows different Windows in the same Partition to have different sizes. Ex: For a Query such as this .. from test (partition by country, state, city store last 10 seconds max 5) as stream1..
, into which 2 Events stream in - ("US", "California", "San Jose", "warp-drive", .. more properties)
and ("India", "Karnataka", "Bangalore", "force-field" .. other props)
, the US > California > San Jose
sub-Window can have a "20 second Window size with max 7 Events" while the India > Karnataka > Bangalore
sub-Window can have a "10 second Window size with max 5 Events". The values defined in the Query will be used as default values.
select ..columns.. from test (partition by item_sku store last 10 seconds max 5 'TimeWindowSize/MyProv') as testStr where ..;
A custom WindowSize Provider is declared in the Query, as part of the Window definition clause. The provider must be given a logical name like "TimeWindowSize/MyProv" within single quotes when declared in the Query. The actual Provider has to be registered with the Kernel before the Query is registered.
2 kinds of Window Size Providers exist - streamcruncher.api.TimeWindowSizeProvider
and streamcruncher.api.WindowSizeProvider
, based on whether the Partition is a Time Based Window or not. The Query still needs a default size to be specified along with the name of the Provider.
A custom Provider would look like this:
public class CustomWindowSizeProvider extends TimeWindowSizeProvider { /** * @param levelValues * Partition level values. */ @Override public long provideSizeMillis(Object[] levelValues) { if (levelValues[0].equals("US") && levelValues[1].equals("California") && levelValues[2].equals("San Jose")) { // 20 second Window. return 20 * 1000; } // Default for the others. return super.provideSizeMillis(levelValues); } /** * @param levelValues * Partition level values. */ @Override public int provideSize(Object[] levelValues) { if (levelValues[0].equals("US") && levelValues[1].equals("California") && levelValues[2].equals("San Jose")) { // 7 size Window. return 7; } // Default for the others. return super.provideSize(levelValues); } }
The Kernel instantiates Providers using the empty Constructor. The Kernel creates a new instance for each level of Partition, where it is declared.
Persistence and Event purging
StreamCruncher can be shutdown and restarted without having to re-register the Queries and Event Streams. They will be loaded automatically when the Kernel starts again. However, the artifacts in the Database will be re-created and as a result the data will be deleted by StreamCruncher, if the Database retains the artiacts after a restart.
StreamCruncher does not store the Input and Output Streams for longer than it is required by the Kernel. As a result, the Events should not be expected to be available after the Kernel has consumed/generated them. If the Events have to be persisted, then it must be done outside the Kernel, by the User.
We have already seen how Partitions create smaller Windows based on the columns/properties. If the Partition has been specified to use Sliding Windows or Random Events Windows or Highest/Lowest values Windows, then these Windows will stay in Memory as long as the Kernel is running. In Time based or Tumbling Windows however, the Window is destroyed if all the Events in the Window expire. Such Windows are re-created by the Kernel if new Events arrive that need to be placed in the Window that was destroyed earlier. Aggregates defined on such Windows also follow the lifecycle of their respective Windows.
Query Configuration and Tuning
Each Query that is registered has a Configuration class called streamcruncher.api.QueryConfig
that can be used to fine tune the behaviour of the Query. It can be retrieved through the StreamCruncher API using the getQueryConfig(queryName)
method or from the ParsedQuery
object (ParsedQuery.getQueryConfig()
) that was created during Query registration. All the settings can be changed even while the Query is running.
The first option sets up the Scheduling policy for the query. There are 2 options:
1) Fixed, where the Query executes every 'x' milliseconds. This setting makes the Query run irrespective of whether the Events arrive or expire. This option is similar to scheduling a simple SQL Query in the Database. This setting is not suitable for Time and Tumbling Windows as the Query may not execute immediately after an Event's expiry.
2) Atleast-or-sooner, allows the Query to execute whenever an Event arrives or expires from a Window. If a Window has atleast one or more Events that are pending consumption, then the Query runs in immediate succession. The consumption pattern is however decided by the nature of the Window. If none of these scenarios arise, the Query will execute at the intervals specified.
getQuerySchedulePolicy() setQuerySchedulePolicy(querySchedulePolicyValue) enum QuerySchedulePolicy { FIXED, ATLEAST_OR_SOONER; } QuerySchedulePolicyValue(policy, timeMillis)
If the Query is running with the Atleast-or-sooner option, then the Event-weight option allows finer control over Query triggering. For each Event Stream that feeds Events into a Query that has been registered, weights can be specified for each Event coming from a Stream. Whenever the combined weight of all pending Events goes above zero, the Query is triggered.
For example: Let's pretend that Query QRY1 consumes Events from 2 Streams A and B. Events from Stream A are more frequent but the information carried by 5 of them have to be held and co-related with 1 Event from B. So, those 5 Events will either have to be held in the Window or can be made to wait for the main Event from B. The Events can be held back from triggering the Query individually by setting A's Event weight to -1 and B's to 6. So, 5 Events from A add up to -5 and "(5 x A) + B = (5 x -1) + 6 = 1" and so the Query fires when A arrives because the total weight goes above 1.0. But, if the Query has a Time based Window or Tumbling Window, then the Weights do not stop the Query from executing when the Events expire from those Windows.
Another option would be to assign a weight of 0 to A and 1 to B. This way, Events from A are not allowed to trigger the Query.
float getUnprocessedEventWeight(key) setUnprocessedEventWeight(key, weight)
If a Query is scheduled to run within the next few Milliseconds (Atleast-or-sooner only), then this setting prevents an Event from triggering the Query if the time left is less than the margin specified.
long getForceScheduleMarginMsecs() setForceScheduleMarginMsecs(forceScheduleMarginMsecs)
The StreamCruncher Kernel is multi-threaded. If a Query execution takes too long, then StreamCruncher starts issuing warnings and after a few warnings, kills the Query's current run because it assumes that something has gone wrong. The Query's longest execution should be analysed and set to prevent terminations.
getStuckJobInterruptionTimeMsecs() setStuckJobInterruptionTimeMsecs(stuckJobInterruptionTimeMsecs)
Queries can be paused and resumed at will. When a Query is paused, then every few seconds, the Kernel keeps checking if that Query has been resumed. If the Query is going to be paused for a long time, then the resume-check interval must be increased suitably to conserve CPU resources.
long getResumeCheckTimeMsecs() setResumeCheckTimeMsecs(resumeCheckTimeMsecs)
Sliding Windows move one Event at a time. So, if the Event Stream floods the System with too many Events, then the System will slow down to a crawl because the Query's Sliding Window has to consume and execute one Event at a time. If the older events can be dropped without fear of losing too much information, then this option allows the Query to keep up with the incoming Stream of Events by jumping towards the most recent Events. So, if there are 1000 pending Events, and the pending-events-allowed is set to 30 and the Sliding Window's size is 50, then the first 919 Events are skipped. The Window consumes Events from 920 to 970 at once and the remaining 971 to 1000 are consumed one at a time.
int getAllowedPendingEvents(key) setAllowedPendingEvents(key, events)
Since StreamCruncher uses an RDBMS underneath, all the performance tips that one would normally use to write SQL queries apply here too. StreamCruncher automatically creates an Index on the Primary-key/Event Id column of the Output Event Stream/Table.
1) The $row_status
column in Partitions, internally relies on an Indexed column. Such Indexed columns must be used to speed up Queries
2) The where-clause on the Stream must have as the first condition, one which filters out as many rows as possible
3) When Events are being co-related, which are similar to SQL Joins, the "Driver Table/Event Stream" should be the Table/Stream which presents the smallest number of Rows to the Join operation
For example:
The fulfillment_events.$row_status is new
picks up only the latest events for the Join operation with order_events
.
Pre-Filters can be attached to an Input Event Stream and they can also lookup dynamic data using the (not) in ..
clause. Since such reference data does not change frequently, the Kernel caches such results and shares the cached data among other Queries that use the same SQL in their Pre-Filters. The Kernel therefore provides a way to specify how often the data must be refreshed from the Database.
ResultSetCacheConfig getResultSetCacheConfig(sqlSubQuery) long getRefreshIntervalMsecs() setRefreshIntervalMsecs(refreshIntervalMsecs)
The Kernel provides an API to retrieve the SQL Sub-Queries in a "Running Query" that will be cached by the Kernel, when it gets registered. This will be useful to retrieve the Cache configuration object to adjust the refresh times.
ParsedQuery parsedQuery = cruncher.parseQuery(parameters); //Iterate through the list. Could be empty if //there are no Pre-Filters with Sub-Queries. for (String sql : parsedQuery.getCachedSubQueries()) { ResultSetCacheConfig cacheConfig = cruncher.getResultSetCacheConfig(sql); cacheConfig.setRefreshIntervalMsecs( /* set to some interval */ ); }
Kernel Configuration and Tuning
StreamCruncher allows various aspects of the Kernel to be tuned. It splits the whole process of Query processing into stages and each stage relies on a Thread-pool and some of these stages are processed by multiple Threads in parallel. The Kernel itself is multi-threaded and so all the Queries that are registered in the System are handled in parallel. This helps the Kernel to scale well on multi-core/multi-processor Hardware. It also uses a Connection Pool to interact with the underlying Database.
Some of the properties are specific to the Database being used. There are templates for each of the Databases that are supported by StreamCruncher. These settings can be tuned and the path to the file has to be sent to the API during startup.
The Database Driver, Id and Password are also specified in this file. If the Database has been configured to preserve the artifacts such as Tables, Indexes etc even after a Server restart (if such an option is available) then the db.preservesartifacts.onshutdown
property in the configuration must be set to true. This is an indication to the Kernel to drop and re-create them while restarting. If the Database is configured to run in a private, embedded mode, then some such Databases require at least one connection to be kept open throughout the life of the Program, otherwise the Database disappears. The db.privatevolatile.instance
option is used to let the Kernel know that it has to keep one connection open until it is shutdown.
In addition to the Database Driver, User name and other properties, the db.schema
can be used to specify the Database Schema that the Kernel must use to create its internal artifacts in.
The tuning parameters listed below must be carefully tested and measured before they are to be applied. The same applies to the Query, Input and Output Stream tuning.
The maximum number of Connections that will be created and held in the Pool - db.connectionpool.maxsize
.
Event Ids that are pumped into the System are processed by the "instreameventprocessor" module. The Threads in this module keep checking for new Events and notify the Query Scheduler module which is next in line. Everytime this Thread is woken up on Event arrival, it stays awake for sometime, looking for new Events from all registered Streams and then goes back to sleep for a while. That is what the 3 options are for - instreameventprocessor.threads.num, instreameventprocessor.thread.empty-runs-before-pause, instreameventprocessor.thread.pause.msecs
.
The Scheduling and Execution are also performed separately and can be configured using these 2 - queryscheduler.threads.num, queryrunner.threads.num
. Each of these 2 settings should be close to the total number of Queries that will be running in the Kernel. Another setting - jobexecution.threads.num
is used to handle the thread-pool which handles parallel execution of the Partitions in a Query.
Since StreamCruncher can automatically purge old Events from the Input Streams, if they have been configured to, this option - rowdisposer.thread.pause.msecs
indicates how often this Auto-purge task should be run by the Kernel.
Since Pre-Filters with Sub-Queries are cached by the Kernel, the number of Threads that must be used to maintain/refresh these Cached Queries can be changed from the default value if the volume of cached data and the number of unique Queries is large. cacherefresh.threads.num
should be used to change the number of these processing Threads.
The Kernel uses the System clock as a reference to check if the Events are going to expire and also to stamp the generated Output Events with the time of occurance. If the Events arriving at the Kernel are out of sync with the computer clock on which the Kernel is running or if for some reason the Kernel-stamped Events have to be in sync with another system, then the Kernel can be set with a bias that gets added to the System clock's time everytime the Kernel needs the latest time.
setTimeBiasMsecs(timeBiasMsecs) long getTimeBiasMsecs()