💡 Hover Feature
Look for ℹ️ icons in titles - hover over them to reveal additional information.
- Modular Components in Delta Live Tables Pipeline Development
- Key Components of Reusable Architecture
- 1. Template System
- Reference Code ℹ️ {.hover-title}
- Reference Code ℹ️ {.hover-title}
- 2. Configuration-Driven Development
- Reference Code ℹ️ {.hover-title}
- 3. Utility Functions
- Notebook Management
- Reference Code ℹ️ {.hover-title}
- Configuration Management
- Reference Code ℹ️ {.hover-title}
- 4. Pipeline Generation based on the Template
- Reference Code ℹ️ {.hover-title}
Modular Components in Delta Live Tables Pipeline Development#
In a Delta Live Tables (DLT) pipeline, four key modular components—Template System, Configuration-Driven Development, Utility Functions, and Pipeline Generation—work in concert to create robust, maintainable, and automated data pipelines. These components form a comprehensive framework that standardizes pipeline development, reduces manual effort, and ensures consistency across different data processing requirements. The modular architecture enables rapid deployment of new pipelines while maintaining flexibility for table-specific customizations through parameter-driven templates.
How the Components Work Together#
-
Template System Module: - Functions as the core framework for standardizing SQL pipeline generation - Houses both harmonization and non-harmonization templates for different data processing needs - Provides placeholder system for dynamic parameter injection and customization - Example:
CREATE OR REFRESH STREAMING TABLE {table_name} COMMENT '{table_description}' -
Configuration-Driven Module: - Maintains table-specific configurations in a structured format - Defines key parameters such as:
- Schema definitions
- Harmonization rules
- Historization settings
- Data quality constraints
- Enables quick onboarding of new data sources without code changes
- Example:
TABLE_CONFIG = [{ "table_name": "consent_status", "template_name": "dlt_sql_harmonisation_template", "schema": "struct<userId:int,version:string,...>", "historisation_type": "SCD TYPE 1" }]
-
Utility Functions Module: - Handles notebook management operations:
- Export/import of notebooks
- Content manipulation
- Version control integration
- Provides configuration management services:
- Environment-specific settings
- Path management
- Security configurations
- Example:
def import_notebook(notebook_path, content, language="PYTHON"): api_url, api_token = get_databricks_context() # Notebook import logic
-
Pipeline Generation Module: - Orchestrates the interaction between all components through:
- Template selection based on table configuration
- Dynamic parameter substitution
- Pipeline validation and generation
- Automated deployment processes
- Performs automated tasks:
for table in TABLE_CONFIG: template_content = export_notebook(table['template_name']) processed_content = substitute_parameters(template_content, table) import_notebook(f"{table['table_name']}_pipeline", processed_content)
Each component plays a crucial role in the pipeline generation process: - Template System: Provides the structural foundation - Configuration: Supplies customization parameters - Utilities: Enables operational functionality - Generator: Orchestrates the entire process
Interaction Among Components#
-
The Template System serves as the foundation of pipeline generation, providing standardized SQL templates with placeholders for both harmonization and non-harmonization scenarios. These templates establish consistent data processing patterns across all pipelines.
-
The Configuration-Driven Module feeds essential parameters into the templates, including:
- Table definitions and schemas
- Harmonization rules
- Data quality constraints
-
Historization settings These configurations enable quick adaptation to new data sources without code changes.
-
The Utility Functions Module facilitates operational tasks throughout the pipeline:
- Managing notebook operations (export/import)
- Handling environment-specific configurations
- Providing error handling and logging services
-
Supporting version control integration
-
The Pipeline Generation Module orchestrates the entire process by:
- Consuming configurations from the Configuration Module
- Applying parameters to templates from the Template System
- Utilizing utility functions for notebook management
- Producing validated and deployment-ready pipelines
Together, these components create a streamlined pipeline generation system that reduces complexity and ensures consistency. This modular architecture enables rapid deployment of new pipelines while maintaining flexibility to accommodate different data processing requirements, environments (dev, test, prod), and business needs.
Below is a detailed exploration of each component, describing its specific functionalities, implementation patterns, and contribution to the overall pipeline automation and efficiency.
Key Components of Reusable Architecture#
1. Template System#
The template system serves as the foundation of our reusable DLT pipeline architecture, providing standardized SQL templates that can be dynamically populated with table-specific configurations. By maintaining two distinct template types - harmonization and no-harmonization - we efficiently handle different data processing requirements while ensuring consistent implementation patterns across all pipelines.
Harmonization Template - Used for tables requiring value standardization - Examples: user_profile, insulin, consent_status - Handles reference data mapping
Reference Code ℹ️#
-- dlt sql template
-- Parameters
-- {table_name} name of the table to load ex: consent_status
-- {schema} schema to load ex: struct
-- {harmonised_attribute} - name of the harmonised attribute
-- {attribute_to_harmonise} - name of the attribute to harmonise ex: status
-- {bronze_attributes_to_silver} - business attribute translation from bronze to silver -- ex: userId as user_id, version as version, createdAt as created_at,
-- {historisation_keys} - key columns to historize ex: user_id
-- {historisation_type} - key columns to historize ex: SCD TYPE 1
-- STEP 1 LOAD RAW DATA FROM LANDING ZONE
CREATE OR REFRESH STREAMING TABLE {table_name}_raw
COMMENT 'Raw data from kakao landing zone for {table_name}'
TBLPROPERTIES('quality' = 'bronze',
'source' = 'kakao')
PARTITIONED BY (sys_source_file_name)
AS
SELECT
uuid() as sys_surrogate_pk,
data.*,
pipeline.origin.update_id as sys_job_id, -- pipeline run id.
_metadata.file_name as sys_source_file_name,
pipeline.timestamp as sys_received_at,
digital_health_catalog_bronze_dev.kakao_dev_dlt.get_modified_at(_metadata.file_name, sys_source_file_name, 'file_name') as sys_modified_at
FROM cloud_files(
"${path}{table_name}//","json",
map("schema", "{schema}",
"badFilesPath","${bad_files_path}")
) as data
CROSS JOIN sys_pipeline_last_run pipeline;
-- STEP 2 VALIDATE DATA QUALITY & HARMONIZE --
-- Python has dedublication functions: dropDuplicates() can be efficient for batch processing, while dropDuplicatesWithinWatermark() is better suited for streaming.
CREATE OR REFRESH STREAMING TABLE {table_name}
(
CONSTRAINT mandatory_createdAt EXPECT (created_at IS NOT NULL),
CONSTRAINT mandatory_userId EXPECT (user_id IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT can_not_harmonize_{harmonised_attribute} EXPECT ({harmonised_attribute} IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT harmonized_{harmonised_attribute} EXPECT ({harmonised_attribute} IS NOT NULL and {harmonised_attribute} = original_{harmonised_attribute})
)
COMMENT 'Validated. harmonized data for {table_name}'
TBLPROPERTIES('quality' = 'bronze',
'source' = 'kakao')
AS
SELECT
sys_surrogate_pk,
{bronze_attributes_to_silver}
dth.harmonized_value as {harmonised_attribute},up.{attribute_to_harmonise} as original_{harmonised_attribute}
,sys_modified_at, sys_source_file_name, sys_received_at
FROM STREAM(LIVE.{table_name}_raw) as up
LEFT JOIN common_bronze.{harmonised_attribute}_harmonization dth ON up.{attribute_to_harmonise} = dth.value;
-- STEP 3 INTEGRATE TO SILVER --
CREATE OR REFRESH STREAMING TABLE digital_health_catalog_silver_dev.kakao_dev_dlt.{table_name}
COMMENT 'Validated, harmonized, deduplicated and historised data for {table_name}'
TBLPROPERTIES('quality' = 'bronze',
'source' = 'kakao');
APPLY CHANGES INTO digital_health_catalog_silver_dev.kakao_dev_dlt.{table_name}
FROM STREAM(LIVE.{table_name})
KEYS({historisation_keys})
--APPLY AS DELETE WHEN sys_is_deleted = true
SEQUENCE BY sys_modified_at
COLUMNS * EXCEPT (original_{harmonised_attribute})
STORED AS {historisation_type};
Non-Harmonization Template - Used for simpler data flows - Examples: meal, medication, smart_pen - Streamlined processing logic
Reference Code ℹ️#
-- dlt sql template
-- Parameters
-- {table_name} name of the table to load ex: consent_status
-- {schema} schema to load ex: struct
-- {bronze_attributes_to_silver} - business attribute translation from bronze to silver -- ex: userId as user_id, version as version, createdAt as created_at,
-- {historisation_keys} - key columns to historize ex: user_id
-- {historisation_type} - key columns to historize ex: SCD TYPE 1
-- STEP 1 LOAD RAW DATA FROM LANDING ZONE
CREATE OR REFRESH STREAMING TABLE {table_name}_raw
COMMENT 'Raw data from kakao landing zone for {table_name}'
TBLPROPERTIES('quality' = 'bronze',
'source' = 'kakao')
PARTITIONED BY (sys_source_file_name)
AS
SELECT
uuid() as sys_surrogate_pk,
data.*,
pipeline.origin.update_id as sys_job_id, -- pipeline run id.
_metadata.file_name as sys_source_file_name,
pipeline.timestamp as sys_received_at,
digital_health_catalog_bronze_dev.kakao_dev_dlt.get_modified_at(_metadata.file_name, sys_source_file_name, 'file_name') as sys_modified_at
FROM cloud_files(
"${path}{table_name}//","json",
map("schema", "{schema}",
"badFilesPath","${bad_files_path}")
) as data
CROSS JOIN sys_pipeline_last_run pipeline;
-- STEP 2 VALIDATE DATA QUALITY & HARMONIZE --
-- Python has deduplication functions: dropDuplicates() can be efficient for batch processing, while dropDuplicatesWithinWatermark() is better suited for streaming.
CREATE OR REFRESH STREAMING TABLE {table_name}
(
CONSTRAINT mandatory_createdAt EXPECT (created_at IS NOT NULL),
CONSTRAINT mandatory_userId EXPECT (user_id IS NOT NULL) ON VIOLATION DROP ROW,
)
COMMENT 'Validated. harmonized data for {table_name}'
TBLPROPERTIES('quality' = 'bronze',
'source' = 'kakao')
AS
SELECT
sys_surrogate_pk,
{bronze_attributes_to_silver}
sys_modified_at, sys_source_file_name, sys_received_at
FROM STREAM(LIVE.{table_name}_raw) as up
-- STEP 3 INTEGRATE TO SILVER --
CREATE OR REFRESH STREAMING TABLE digital_health_catalog_silver_dev.kakao_dev_dlt.{table_name}
COMMENT 'Validated, harmonized, deduplicated and historised data for {table_name}'
TBLPROPERTIES('quality' = 'bronze',
'source' = 'kakao');
APPLY CHANGES INTO digital_health_catalog_silver_dev.kakao_dev_dlt.{table_name}
FROM STREAM(LIVE.{table_name})
KEYS({historisation_keys})
--APPLY AS DELETE WHEN sys_is_deleted = true
SEQUENCE BY sys_modified_at
COLUMNS *
STORED AS {historisation_type};
2. Configuration-Driven Development#
Table Configuration Structure
- Standardized implementation
- Reduced code duplication
- Consistent error handling
- Simplified maintenance
Reference Code ℹ️#
base_path = "/Workspace/Users/adminpepj@novonordisk.com/DHO-PoC1-Playground/src/"
class Path:
template_dir = f"{base_path}/conf/template/"
target_dir = f"{base_path}generated_sql_pipelines/"
TABLE_CONFIG = [{"table_name": "consent_status",
"template_name": "dlt_sql_harmonisation_template",
"schema": "struct<userId:int,version:string,status:string,createdAt:string>",
"harmonised_attribute": "consent_status",
"attribute_to_harmonise": "status",
"bronze_attributes_to_silver" : "userId as user_id, version as version, createdAt as created_at,",
"historisation_keys": "user_id",
"historisation_type": "SCD TYPE 1"},
{"table_name": "user_profile",
"template_name": "dlt_sql_harmonisation_template",
"schema": "struct<userId:int, birthYear:int, diabetesType:string, appVersion:string, osVersion:string, createdAt:string>",
"harmonised_attribute": "diabetes_type",
"attribute_to_harmonise": "diabetesType",
"bronze_attributes_to_silver" : "userId as user_id, birthYear as birth_year, appVersion as app_version, osVersion as os_version, createdAt as created_at,",
"historisation_keys": "user_id",
"historisation_type": "SCD TYPE 2"},
{"table_name": "insulin",
"template_name": "dlt_sql_harmonisation_template",
"schema": "struct<id:int, userId:int, smartPenId:int, duid:string,name:string, insulinType: string, dose:float, injectedAt:string>",
"harmonised_attribute": "insulin_type",
"attribute_to_harmonise": "insulinType",
"bronze_attributes_to_silver" : "up.id, userId as user_id, smartPenId as smart_pen_id, duid as duid, name as name, dose as dose,injectedAt as injected_at,",
"historisation_keys": "id",
"historisation_type": "SCD TYPE 1"},
{"table_name": "meal",
"template_name": "dlt_sql_noharmonisation_template",
"schema": "struct<id:int, userId:int, startAt:string, kcal:float, carbohydrate:float, sugar:float, protein:float, fat:float, transFat:float, cholesterol:float, salt:float>",
"bronze_attributes_to_silver" : "id, userId as user_id, startAt as start_at, kcal as kcal, carbohydrate as carbohydrate, sugar as sugar, protein as protein, fat as fat, transFat as trans_fat, cholаsterol as cholesterol, salt as salt,",
"historisation_keys": "id",
"historisation_type": "SCD TYPE 1"},
{"table_name": "medication",
"template_name": "dlt_sql_noharmonisation_template",
"schema": "struct<id:int, userId:int, medicationType:string, intakeAt:string>",
"harmonised_attribute": "medication_type ",
"attribute_to_harmonise": "medicationType",
"bronze_attributes_to_silver" : "id userId as user_id, medicationType as medication_type, intakeAt as intake_at,",
"historisation_keys": "id",
"historisation_type": "SCD TYPE 1"},
{"table_name": "smart_pen",
"template_name": "dlt_sql_noharmonisation_template",
"schema": "struct<id:int, userId:int, deviceInstanceId:string deviceInstanceId, insulinPen:string, serialNumber:string, hardwareVersion:string, softwareVersion:string, firmwareVersion:string, createdAt:string>",
"harmonised_attribute": "smart_pen",
"attribute_to_harmonise": "insulinPen",
"bronze_attributes_to_silver" : "id, userId as user_id, deviceInstanceId as device_instance_id, insulinPen as insulin_pen, serialNumber as serial_number, hardwareVersion as hardware_version, softwareVersion as software_version, firmwareVersion as firmware_version createdAt as created_at,",
"historisation_keys": "id",
"historisation_type": "SCD TYPE 1"}
]
3. Utility Functions#
Notebook Management#
Functions handling the programmatic export and import of Databricks notebooks (export_notebook, import_notebook), enabling automated generation and deployment of pipeline notebooks while maintaining version control and content management capabilities.
Reference Code ℹ️#
import sys
import os
import requests
import base64
from databricks.sdk.runtime import *
def get_databricks_context():
"""
Retrieve Databricks workspace context including API URL and token.
"""
api_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
api_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
return api_url, api_token
def export_notebook(notebook_path, format="SOURCE"):
api_url, api_token = get_databricks_context()
headers = {
"Authorization": f"Bearer {api_token}"
}
url = f"{api_url}/api/2.0/workspace/export"
params = {
"path": notebook_path,
"format": format
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise Exception(f"Notebook export failed: {response.text}")
return base64.b64decode(response.json()["content"]).decode("utf-8") # Return Notebook content
def import_notebook(notebook_path, content, language="PYTHON", overwrite=True):
api_url, api_token = get_databricks_context()
headers = {
"Authorization": f"Bearer {api_token}"
}
url = f"{api_url}/api/2.0/workspace/import"
payload = {
"path": notebook_path,
"content": base64.b64encode(content.encode('utf-8')).decode('utf-8'), # Base64 encode content
"overwrite": overwrite,
"format": "SOURCE",
"language": language.upper() # e.g., "PYTHON", "SCALA", "SQL", "R"
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
raise Exception(f"Notebook import failed: {response.text}")
Configuration Management#
The get_databricks_configuration function provides centralized, environment-aware configuration management, handling environment-specific settings, paths, and operational parameters while ensuring consistent configuration across different deployment scenarios.
Reference Code ℹ️#
"""databricks config module"""
from typing import Any
def get_databricks_configuration(mode: str, env: str = "dev") -> dict[Any, Any]:
"""
Returns global configuration for databricks workflow to manage infrastructure
:param env: environment type
:return: configuration for databricks workflow
"""
catalog = f"digital_health_catalog_bronze_{env}"
schema_base = f"kakao_{mode}_{env}"
registered_input = [
"consentStatus",
"medication",
"userProfile",
"insulin",
"smartPen",
"walking",
"meal",
"smartPenErrorEvents",
"workout",
]
return {
"databricks": {
"secrets_scope": f"kakao-integration-{env}",
"vacuum_retain_hours": 7 * 24,
"catalog": catalog,
"schema_base": schema_base,
"bronze_table": f"bronze_kakao_{mode}_{env}",
"landing_volume": f"landing_kakao_{mode}_{env}",
"checkpoint_volume": f"checkpoint_kakao_{mode}_{env}",
"bad_files_volume": f"bad_files_kakao_{mode}_{env}",
"json_types": registered_input,
}
}
4. Pipeline Generation based on the Template#
The pipeline generation process iterates through table configurations, applying specific parameters to standardized templates to create customized SQL pipelines. This automated approach ensures consistent implementation while allowing for table-specific customizations through parameter substitution in the template content.