Update Policies

13 Aug, 2024 |
Brian

Brian is the person behind the dcode.bi site. He is keen on helping others be better at what they do around data and intelligence.

Update Policies

In Real-Time Intelligence in Fabric we have several options to load data from one layer to the next. One of these options is the Update Policies.

An update policy is a feature set which allows you to define the following:

  • Transformations from source to destination
  • Refresh rates
  • Error handling

This blog post gives an overview of the Update Policies and tries to help you get started.

The transformation

In an Update Policy you can add as much and many transformations you need - it is actually just a KQL statement. So everything you have from the normal KQL queries, you also have for the query in the transformation.

An example statement could be something like this:

    NYCTaxi
    | project VendorID, RatecodeID, PULocationID, DOLocationID, fare_amount, passenger_count
    | join kind=inner Vendors on VendorID
    | project VendorID, RatecodeID, PULocationID, DOLocationID, fare_amount, passenger_count, VendorName
    | extend Ingestion_time = ingestion_time()

All you need to do is to add it to a KQL function like this:

.create function LoadTaxiDataToSilver {
    NYCTaxi
    | project VendorID, RatecodeID, PULocationID, DOLocationID, fare_amount, passenger_count
    | join kind=inner Vendors on VendorID
    | project VendorID, RatecodeID, PULocationID, DOLocationID, fare_amount, passenger_count, VendorName
    | extend Ingestion_time = ingestion_time()
}

The above is the baseline and transformations of what you would like to do with your data. The example loads from the NYCTaxi data set from a bronze layer to a silver layer.

Note that I’ve added a new column named ingestion_time which reads the time of ingestion from the underluying engine. To do that I need to execute this command: .alter table NYCTaxi policy ingestiontime true

In the silver layer we then need to create the new table with the corresponding columns as the output from the function.

.create table SilverNYXTaxi (
        VendorID:long
        ,RatecodeID:long
        ,PULocationID:long
        ,DOLocationID:long
        ,fare_amount:real
        ,passenger_count:real
        ,Vendorname:string
        ,Ingestion_time:datetime
    )

When the destination table is in place, I can then create the actual Update Policy with below script:

.alter table SilverNYCTaxi policy update
@'[{
    "IsEnabled": true //yes, we want this to run
    ,"Source": NYCTaxi //the source table
    ,"Query": LoadTaxiDataToSilver //the function we just created
    ,"IsTransactional": true //for each new row in the source table
    ,"PropagateIngestionProperties": false //this is for debugging
}]'

Breakdown of the script:

  1. .alter table SilverNYCTaxi - this alters the destination table to have the update policy attached. This table does not have to be in the same KQL database as the source. If that is the case, then you need to define the ManagedIdentity property in the JSON - see later on this list for details.
  2. IsEnabled - an update policy can be enabled or disabled. This is good when working with changes to the update policy, as sometimes we need a break in the loading process to do the changes.
  3. Source - this is the name of the source table. This enables the engine to listen to new rows entering the table - the engine acutally alwyas listens to entering rows, but that is option for a post in the future.
  4. Query - a reference to the KQL function I just made above. You can but in a KQL statement here, but the maintennance of that will not be easy.
  5. IsTransactional - just like the SQL server the KQL engine can be transactional in behaviour. If this is set to true, then if an error occurs in the load, then the entire transaction will roll back and nothing will be loaded. If it is not transactionel, then the rows from the transaction which are already saved in the destionation table will stay there, even if the transaction fails.
  6. PropagateIngestionProperties - states if properties specified during ingestion to the source table, such as extent tags and creation time, apply to the target table. Mostly used for debugging.

The update policy also has a ManagedIdentity property, Which states the managed identity on behalf of which the update policy runs. The managed identity can be an object ID. The update policy must be configured with a managed identity when the query references tables in other databases or tables with an enabled row level security policy.

The refresh rates

Normally the above should do the trick. But if I need more throughput or change the ingestion time, then I have a nice little trick to do.

I can alter the source tables to have a property named ingestionbatching to tell the engine, when I want the ingestion to happen. The property takes three arguments which tells the engine when to start the ingestion process:

  1. MaximumNumberOfItems - When the number of new items in the table reaches or exceeds this number. The default is 500 with a maximum of 25.000.
  2. MaximumRawDataSizeMB - When the incoming raw data size in MB reaches or exceeds this number. The default here is 1024 with a maximum of 4096.
  3. MaximumBatchingTimeSpan - When the time-span defined in this property has gone by. The default here is 300 with a maximum of 1.800.

This means I can alter the source table if I need it - it could look like this:

.alter table (NYCTaxi) policy ingestionbatching
{
    "MaximumBatchingTimeSpan" : "00:00:10", //for every 10 mins run the process
    "MaximumNumberOfItems" : 20, //for every 20 new items, run the process
    "MaximumRawDataSizeMB": 300 //fi every 300MB run the process
}

The error handling

Out-of-the-box there is not automated error message or anything telling you that the process has errors or is failed in the execution. This is where the .snow ingestion failues command comes in to the rescue.

When the Update Policy handles the data movement, and it raises errors, then it records the error for later debugging and error-handling.

To get to the error messages I can run the .show ingestion failures command like below:

.show ingestion failures

But this gives me ALL the ingestion failures on the database. If you have a lot of tables and ingestions, you might be flodded with information.

Luckily the command accepts the “| where” clause.

So I can execute this command instead:

.show ingestion failures
| where Table == "SilverNYCTaxi"

This gives me the errors for the SilverNYCTaxi table.

The output looks something like this (with selected columns and dummy data).

OperationId FailedOn Details ErrorCode Principal NumberOfSources
guid datetime Stream with ID ‘*****.csv’ has a malformed CSV format Stream_ClosingQuoteMissing adapp=xxxxxx 1

Now I can begin to debug the whereabouts of my ingestion and why rows are not entering my table.

Now to try it out yourself

The above should be enough to get you started with Update Policies and implement them in your own organisation and/or project.

comments powered by Disqus