Skip to content

Dependency tracking in the Datahub using MultiSource

In this tutorial we will go through an example case for when MultiSource should be used in a job. To follow along with the examles you need a datahub running and the datahub cli connected to it.

With that in place, lets bootstrap some sample data:

$ mim dataset create person
SUCCESS  Dataset 'person' has been created
$ echo '[{"id": "@context", "namespaces": {"_": "http://example/person/" }},
    {"id": "bob", "props": {"name": "Bob the builder"}},
    {"id": "mary", "props": {"name": "Mary Poppins"}},
    {"id": "shaun", "props": {"name": "Shaun the Sheep"}}
]' > /tmp/person.json && mim dataset store person /tmp/person.json
SUCCESS  Entities Loaded

What are dependencies in Datahub jobs

A job in Datahub is a pipeline with three (or two) stages: source ( => transform ) => sink. When data needs to be copied unmodified, only a source and a sink are needed. When the shape of the data needs to be modified or if the data needs to be enriched using queries, a transform stage can be added.

Lets set up an example job that copies the person dataset to a new dataset: employee

$ mim dataset create employee
SUCCESS  Dataset 'employee' has been created

$ echo '{ "id" : "employees", "title": "employees",
                "triggers": [{
                    "jobType": "incremental",
                    "triggerType": "cron",
                    "schedule": "@daily"
                }],
                "source" : {
                        "Type" : "DatasetSource",
                        "Name": "person"
                },
                "sink" : {
                        "Type" : "DatasetSink",
                        "Name": "employee"
                }
        }' | mim jobs add -
 SUCCESS  Read config file
 SUCCESS  Added job to server

Now run it once to perform an initial load of all person entities into employee:

$ mim jobs operate -o run employees

And lets confirm that 3 items were copied by looking at the last job run

$ mim jobs ls

# Listing server jobs on http://localhost:8080

Title     | Paused | Tags | Source  | Transform | Sink    | Last Run                  | Last Duration | Last Processed | Error
employees |  false |      | Dataset |           | Dataset | 2023-05-24T10:58:50+02:00 | 892.037µs     | 3              |

Pros and cons of incremental job execution

Incremental jobs are efficient

While a job pipeline can be run in a number of ways, the most efficient way is using incremental jobs on a schedule. A source in an incremental job will keep track of how far it has been consumed during an execution, storing a continuation token as job state in the datahub.

This way incremental jobs will only work on changes that are new since the last execution.

And if the schedule interval is matched appropriately, this often results in small batches of entities that are fed through the job pipeline on each execution. Which means very short job run times.

Lets run our job again. This time mim jobs ls should show that 0 entities were processed.

$ mim jobs operate -o run employees

Info

Note that the default mode for manual job runs is incremental, to run in fullSync mode we can add --jobType=fullsync to the command.

If we now add one change to the person dataset and run the job again, only that one change will be processed.

$ echo '[{"id": "@context", "namespaces": {"_": "http://example/person/" }},
    {"id": "waldo", "props": {"name": "Waldo from Where is Waldo"}}
]' > /tmp/person.json && mim dataset store person /tmp/person.json
$ mim jobs operate -o run employees
$ mim jobs ls

# Listing server jobs on http://localhost:8080

Title     | Paused | Tags | Source  | Transform | Sink    | Last Run                  | Last Duration | Last Processed | Error
employees |  false |      | Dataset |           | Dataset | 2023-05-24T11:12:41+13:00 | 804.071µs     | 1              |

Queries in incremental jobs create dependencies

In cases where jobs need to enrich their processed data, Datahub offers a transform stage where jobs can query into other datasets using the transform javascript API. The query results can then be combined with source data to produce an enriched output.

Lets add two more datasets to the datahub. An order dataset containing order items which point to an order customer and to the ordered product. We also need the catalog dataset containing products that can be ordered.

$ mim dataset create order
$ echo '[{"id": "@context", "namespaces": {"_": "http://example/order/", "p": "http://example/person/","c": "http://example/catalog/" }},
    {"id": "1", "props": {"name": "Order from Bob"}, "refs":{"customer": "p:bob", "product": "c:milk" }}
]' > /tmp/order.json && mim dataset store order /tmp/order.json
$ mim dataset create catalog
$ echo '[{"id": "@context", "namespaces": {"_": "http://example/catalog/" }},
    {"id": "milk", "props": {"name": "Milk", "fatPct": 1.5}}
]' > /tmp/catalog.json && mim dataset store catalog /tmp/catalog.json

And we also add a transform script to our job which queries into those datasets. When processing a person entity, the script first queries for orders that point to our person. If an order is found, we do a second query to find the product entity belonging to the order. Finally we add the found product to our outgoing employee entity as "likes" property.

Put this script in a file called transform.js:

export function transform_entities(entities) {
    entities.forEach((entity) => {
        // find orders pointing to this person entity
        const ordersRs = Query([GetId(entity)], "http://example/order/customer", true, ["person", "order"]);
        if (ordersRs.length == 0) {
            return;
        }
        const order = ordersRs[0][2];
        // find the product name of the order
        const product = Query([GetId(order)], "http://example/order/product", false, ["order", "catalog"])[0][2];
        const productName = GetProperty(product, GetNamespacePrefix("http://example/catalog/"), "name");
        // set the likes property on the employee entity
        SetProperty(entity, "http://example/person/", "likes", productName);
    });
    return entities;
}

and add it as transform stage to our job.

$ mim transform import -f transform.js employees

Lets now reset the job, so that we reprocess all entities, and run it again to perform the enrichment.

$ mim jobs operate -o reset employees
$ mim jobs operate -o run employees

When we look at the contents of the employee dataset now, using mim dataset entities employee, we should see that bob likes milk.

If a job produces enriched entities, which contain data both from the job source, and other datasets, then it is depending on those other datasets.

The dependency is that entities written to the sink are only current, as long as neither source dataset nor datasets that enrichments are fetched from are unchanged. If an entity in the source dataset receives a new change, the job must produce a new version of the enriched output entity to keep the sink current. And the same is true when changes in dependency datasets occur.

For our example this means that we should reprocess bob when his order is changed or deleted. Or when the milk product itself is changed or deleted. Because the change could mean that there is no proof of that bob likes milk anymore.

How to track dependencies

A regular DatasetSource only detects change in the configured source dataset. So we need another tool to detect change in dependency datasets as well: MultiSource

MultiSource allows to configure the main source dataset, and a list of dependency datasets. Each dependency dataset also needs information about how it is linked back to the main dataset. These links usually should correlate to queries in the jobs transform script.

Lets update our job, and make it use a MultiSource.

A MultiSource can be configured directly in the job configuration, but we want to use dependency registration via javascript.

So we update transform.js, and include a track_queries function in addition to the existing transform_entities.

export function track_queries(start) {
    start
        .iHop("order", "http://example/order/customer")
        .hop("catalog", "http://example/order/product");
}

And we change the source type in our job. The updated transform.js script can be uploaded at the same time using the -t option.

$ echo '{ "id" : "employees", "title": "employees",
    "triggers": [{"jobType": "incremental", "triggerType": "cron", "schedule": "@daily"}],
    "source" : {
            "Type" : "MultiSource",
            "Name": "person"
    },
    "sink" : {
            "Type" : "DatasetSink",
            "Name": "employee"
    }
}' | mim jobs add - -t transform.js

Since we changed the source type, we now need to reset the job state and run the job again to build a new internal continuation token as job state. While it works to use mim jobs operate -o reset employees followed by mim jobs operate -o run employees, you will notice that this way 6 employees are processed in the job run, while there are only 4 persons in total.

This is because bob is processed 3 times.

  • Once because he is in the main source, the person dataset.
  • Once because we track the catalog dataset, and the milk entity from catalog is linked back to Bob via an order.
  • Once because there is an order pointing to him, and we implicitly also track changes to orders because it is a transitive dependency.

To avoid this redundant processing, it is recommended to do the inital load of MultiSource jobs using a fullsync. Fullsync runs with MultiSource jobs will only process the main dataset (person in our examle), but also build a continuation token as job state for successive incremental runs.

So we run

$ mim jobs operate -o run employees --jobType=fullsync

At this point the job is ready to react to changes in dependency datasets. We put this to a test and do a change to the milk product.

$ echo '[{"id": "@context", "namespaces": {"_": "http://example/catalog/" }},
    {"id": "milk", "props": {"name": "Light Milk", "fatPct": 1.5}}
]' > /tmp/catalog.json && mim dataset store catalog /tmp/catalog.json

Now we run the job incrementally, and MultiSource should find the change in catalog, follow the dependency links from our track_queries registration back to Bob in the persion dataset, and process bob.

$ mim jobs operate -o run employee

To verify, we can check mim jobs history employees, where the processed attribute of the last run status should be 1, meaning one entity was processed. We can also check the employee dataset to verify that bob has a new liked product, it should be "Light Milk" now

$ mim dataset entities employees