Testing Datahub for Certification/Safety/Fraud management - Time meets semantics (2/2)

Some time ago I wrote the first part of this blog. In this second part I will detail how the temporal query engine is actually implemented in MarkLogic.
The objective of the engine is to identify period of time during the Tests when multiple conditions are met and especially when a temporal pattern is realised.

This blog is quite technical with some advanced concepts which are not that easy to explain, especially by a non native English speaker :) . Don't hesitate to contact me if you want more details.

The data loaded in MarkLogic

Context sources

First we have data sources which provide metadata about the context of the Tests. Depending on the use case it could be:
  • Weather data
  • Test conditions
  • Crew/expert reports: documents/unstructured content
  • Configuration of the product during the test
  • Etc.
In MarkLogic these sources can be structured or unstructured data and could eventually contain geospatial shapes. Data is stored in documents (xml or json).

The time series data

I would consider MarkLogic is not a good fit for massive raw time series. There are specialised Database in the market to deal with the volume and optimise the storage. QuasarDB, a french vendor is one of them of example.
However MarkLogic is great to mix heterogenous datasets and make the most of them by creating transversal queries on these datasets.

So how can we manage the time series in MarkLogic?

For this use case, we don't need raw time series, the objective is indeed to identify the periods that match specific conditions with a quite good precision and then deep dive into the raw data of the identified periods. MarkLogic helps in identifying the period but raw exploration can be performed with time series DB.
That's why we can apply sampling in order to ingest, not raw point in time measures but periods during which we consider a sensor to have a particular value. For each sensor we want to leverage, we ingest periods with start, end, value of the sensor and related Test reference. Actually the value can be a scalar but it could also be more complex such as a text (ex: the speech of a actor) or geospatial coordinates. Value can also be boolean, in that case the periods are the consecutive periods with value=true or value=false (ex: car auto-pilot engaged).
In MarkLogic, each event is stored as a document (json or xml) with the data stored inside. 

We now have contextual data and time series in MarkLogic stored in documents. 


This is ok to already perform some quite advanced queries but we miss something for the temporal pattern: triples.

Here comes the triple store

We use TDE (Template Driven Extraction) to lift some of the data of the events document to the triple store. In order to manage temporal query we will generate the following triples between the event, the related test, its start, end and value.


We are now ready to run the temporal engine.

The temporal query engine logic

Reducing the scope

We are talking about some uses cases that can manipulate millions of events, so before trying to find a temporal pattern in the events, it's better to filter the scope.
MarkLogic being a multiple model database, we will make the most of it.

So first we generally have conditions that can be applied to the contextual metadata. The engine will so first query the context sources in order to identify the eligible Tests, the ones that match all conditions on all the context metadata.
To do that we apply conditions on all context sources, find positive results and then perform the intersection of the Tests related to the positive results.


Note: in this exemple, we only consider context sources that have direct relations with the Tests (let's say the Test is a property of the context source).
By applying the concept I illustrated in this repo https://github.com/epoilvet/entityExporterService, we can imagine that we apply conditions on context metatada that can also have indirect relation with the Test



We now have a list of Tests which are eligible for temporal pattern matching. It's time to apply temporal matching logic

Searching for temporal patterns 

This is for exemple the pattern we are looking for. It can happen anytime in the one or more of our eligible tests:
We first want that 3 different events happen at the same time, then 5 minute later another event happen and then 2min later another one.


In the application we developed, the query (like the one in the diagram) was drawn using visjs library. Event are represented with nodes and edges are the temporal relations between Events (intersection= must happen at the same time or Sequence =  with a max delay). We could of course imagine other type of temporal relations (end to start, start to start, etc - similar to the project management dependencies).

Here is the algorithm that was implemented with MarkLogic:

Filtering strategy

To make the query efficient, it's important to leverage multi-model capabilities of MarkLogic. We are going to execute a SPARQL on the triple store but we can pre-filter the events based on document conditions (the document from which the triples are lift using TDE). That way, the SPARQL query will be executed only on prefiltered triples and not the overall triple store.

The first filter we create is based on the eligible Tests we identify based on context conditions. We can first create a document query to keep only the event documents linked to these Tests.

Then we want to keep only the events in scope of the temporal pattern.

What we want : We want to execute the query only the events defined in the query.
Strategies:
  • Document filtering: if the query only has once a specific Event type (ex: Event Speed with speed), then we can create a document query that will keep only the triples from documents with this specific condition (ex: Events with Speed > 150km/h). With this document filter all the Event of Type Speed in the SPARQL query will match the condition we set (so there is no need to specify in the SPARQL query that the value of the Event must be > 150km/h)
  • SPARQL filtering: If the query contains multiple times the same event type but with different condition (ex: one event with Speed >150km/h and another with Speed <40km/h), then the only way to get the right events at the right time is to apply SPARQL filtering in the SPARQL itself. We can however have a document query with  Speed >150km/h OR Speed <40km/h. This will keep Events in scope of the query but it's not possible to differentiate them only with their type.

If we combine all the filters from all the Event type, we must have a document query like this (Event Type A / value=XX) OR  (Event Type A / value=YY) OR (Event Type A / value=ZZ OR value==TT ). This will keep in the scope of the SPARQL query, only the Events that match the conditions we have in the pattern.

Intersection group detection

The first thing we need to do is to identify the "intersection groups" of the query. In the exemple above, the left group (Event Type A / Event Type B / Event Type C) is an intersection group, they are Events that must share a period of time.
To detect the intersection group, we loaded the query graph (from visjs graph serialisation) into an in memory [triple] store. With the graph in memory it's then easy to perform a SPARQL query to detect all groups of node which have a "Same time" relation between them.

As soon as we extract the intersection groups we can then run the main query. The main query is a SPARQL that will apply all the contraints from the temporal pattern.

Intersection

With the groups we identified, we can generate a SPARQL subquery that calculates the intersections each time we have Event A / Event B / Event C  (with their related value conditions) sharing a period of time. To say it short,  it's a GROUP BY where we select max(start) and min(end) of events of type A, B and C. 
This subquery will produce all intersections that match the criteria we want.

Sequence contraints

Now that intersection groups are replaced by the subqueries which produce a new calculated Event with start/end corresponding to the  intersections of the inner events, we can simply create a SPARQL with conditions between all end of the previous event and start of the consecutive one (with the max delay we are looking for) of the overall pattern.
If we take the initial example, it would be something like this:
(Intersection group Event).end + 5min > (Event Type D).start 
AND (Intersection group Event).end  < (Event Type D).start 

(Event Type D).end + 2min > (Event Type E).start 
AND (Event Type D.end  < (Event Type E).start 

The main query will be a huge SPARQL query generated by aggregating the intersection group subqueries and sequence queries.

Execution

The logic produced a big SPARQL we can run on the pre-filtered triple store.
One more thing: as we know the list of eligible Tests, we can split the execution and distribute it in order to avoid any single query on massive amount of data. We can indeed create batches of Tests and run the SPARQL only on the Tests of each batch. For this we leverage multi-model, by filtering the triple store only on triples coming from events of the batch Tests.

More to come

This was an overview of the algorithm of the temporal query engine we used on different projects. 
At the end, the engine is a kind of SPARQL + document filtering query generator.
The implementation is quite generic and can be applied with no major change on industrial use cases as well as video media analysis for exemple.
I expect to provide a generic implementation on my github soon.




Popular posts from this blog

Domain centric architecture : Data driven business process powered by Snowflake Data Sharing

Snowflake Data sharing is a game changer : Be ready to connect the dots (with a click)

Process XML, JSON and other sources with XQuery at scale in the Snowflake Data Cloud