When working with large datasets in data pipelines, processing every record repeatedly can be inefficient. A common approach to optimize this process is to load only the changes or “delta” records. This technique is referred to as incremental loading and helps you process only new, updated, or deleted records from the source.
In this guide, we’ll walk through how to handle incremental loading of delta records in BigQuery using Dataform. We’ll consider records containing an operation flag that indicates whether a record has been inserted (I), updated (U), or deleted (D), which is commonly provided by most CDC integration applications.
How Incremental Loading Works
Imagine you have a landing zone for incoming delta records. In this example, we’ll call this table delta_table. Over time, as more delta records are appended, the table will grow continuously, serving as a log of all changes made to your data.
Additionally, you maintain another table where only the latest changes from the delta records are applied. This is the table that holds the most up-to-date and accurate version of your data. We’ll refer to this as incrementally_loaded_table.
Now, let’s walk through the incremental loading process using this setup:
To begin, we need to configure the script’s behavior and define how Dataform will process the code.
config {
type: "incremental",
schema: "incrementally_loaded_table",
uniqueKey: ["orderId", "storeId"],
bigquery: {
partitionBy: "TIMESTAMP_TRUNC(delta_recordstamp, DAY)"
}
}
In the configuration block, we configure the following:
- type: “incremental”: This instructs Dataform that we are performing an incremental load.
- schema: Defines the schema (or dataset) where the table will reside.
- uniqueKey: A list of your primary keys. These columns ensure that duplicates are handled properly during updates and inserts.
- partitionBy: This ensures your table is partitioned by day, based on the recordstamp column, to improve query performance and cost.
Now that we’ve configured the Dataform script to handle the process as an incremental load, we need to add some preliminary steps. In this phase, we’ll create a variable that ensures the process only accounts for changes made since the last known incremental load.
pre_operations {
DECLARE max_delta_recordstamp TIMESTAMP;
SET max_delta_recordstamp = (
${when(incremental(),
`SELECT TIMESTAMP_TRUNC(DATE_SUB(MAX(delta_recordstamp), INTERVAL 1 DAY), DAY) FROM ${self()}`,
`TIMESTAMP("2000-01-01")`)}
);
}
In the pre-operations block, we:
- Define the maximum recordstamp from the previous incremental load: This allows us to track changes that occurred after the last processed record.
- Use a fallback value: If this is the first time running the incremental load, we fallback to a default timestamp (TIMESTAMP(“2000-01-01”)), so we process all records initially. If your data contains values older than year 2000, change the value.
The query retrieves the last known recordstamp and ensures we’re fetching any new records since that time.
After we have configured the pre-operations, we can fetch the latest delta. For this, we start by creating a CTE for the fresh data.
WITH fresh AS (
SELECT
*
FROM ${ref("your_dataset", "delta_table")}
${when(
incremental(),
`WHERE TIMESTAMP_TRUNC(delta_recordstamp, DAY) >= max_delta_recordstamp`)}
),
Here, we:
- Retrieve new or updated records from the source table delta_table.
- Filter based on the recordstamp: In an incremental run, we only fetch records with a recordstamp greater than or equal to max_delta_recordstamp, which ensures that we process only the changes that occurred since the last run.
Now, we can create another CTE that combines the fresh data with the old data in one table.
union_old AS (
SELECT * FROM fresh
${when(
incremental(),
`UNION ALL
SELECT * FROM your_project.your_dataset.delta_table`
)
}
),
As we now have both new and old records in the same table, we need to handle the duplicates in order to keep the latest record for the primary key. We will create a new CTE that ranks all records on timestamp, grouped by the primary key.
ranked_records AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY orderId, storeId
ORDER BY delta_recordstamp DESC
) as rank_num
FROM union_old
)
To manage duplicates (caused by updates or inserts):
- We use ROW_NUMBER() to assign a rank to each record based on the unique keys (orderId, storeId) and the recordstamp.
- The latest record gets rank 1, allowing us to filter out older duplicates in the next step.
Now that we have ranked the records from newest to oldest, we can create one final CTE to select only the latest record, keeping the final table updated with the latest data changes.
SELECT
* EXCEPT(rank_num)
FROM ranked_records
WHERE rank_num = 1
Here:
- We select only the latest version of each record (i.e., where rank_num = 1). This ensures we don’t keep outdated versions of the same record.
- The EXCEPT(rank_num) excludes the rank_num column from the final result.
At this point, we have a fresh, updated table with the latest delta changes. However, for many data sources, there may be cases where data is deleted. In our current setup, the latest change might be a row marked with an operation type ‘D’ (deleted). Since we don’t want to include deleted records in our final output, we’ll add a post-operation block to remove those rows from the final table.
post_operations {
DELETE FROM your_project.your_dataset.incrementally_loaded_table
WHERE delta_operation_flag = 'D';
}
In the post-operations block:
- We delete records flagged for deletion (operation_flag = ‘D’). This ensures that any records marked as deleted in the source are also removed from the destination table.
- This step ensures that soft-deleted records from the source system are reflected in your BigQuery tables.
The final code for the incremental loading process will look like this:
config {
type: "incremental",
schema: "incrementally_loaded_table",
uniqueKey: ["orderId", "storeId"],
bigquery: {
partitionBy: "TIMESTAMP_TRUNC(delta_recordstamp, DAY)"
}
}
pre_operations {
DECLARE max_delta_recordstamp TIMESTAMP;
SET max_delta_recordstamp = (
${when(incremental(),
`SELECT TIMESTAMP_TRUNC(DATE_SUB(MAX(delta_recordstamp), INTERVAL 1 DAY), DAY) FROM ${self()}`,
`TIMESTAMP("2000-01-01")`)}
);
}
WITH fresh AS (
SELECT
*
FROM ${ref("your_dataset", "delta_table")}
${when(
incremental(),
`WHERE TIMESTAMP_TRUNC(delta_recordstamp, DAY) >= max_delta_recordstamp`)}
),
union_old AS (
SELECT * FROM fresh
${when(
incremental(),
`UNION ALL
SELECT * FROM your_project.your_dataset.delta_table`
)
}
),
ranked_records AS (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY orderId, storeId
ORDER BY delta_recordstamp DESC
) as rank_num
FROM union_old
)
SELECT
* EXCEPT(rank_num)
FROM ranked_records
WHERE rank_num = 1
post_operations {
DELETE FROM your_project.your_dataset.delta_table
WHERE delta_operation_flag = 'D';
}
Implementing incremental loading in BigQuery using Dataform is a powerful and efficient approach to manage large datasets and ensure you are only processing new, updated, or deleted records and not full loads every time.
You can significantly reduce processing time and costs, especially for large-scale data pipelines. This approach not only saves costs, but also makes your data pipelines more scalable and maintainable over time. Whether you’re managing customer transactions, logs, or real-time events, incremental loading is essential to have in your data engineering toolkit.