💡 Hover Feature
Look for ℹ️ icons in titles - hover over them to reveal additional information.
Go to Playbook Main Page
Next: File Ingestion Using Autoloader(Python)
Back: Back to previous page
Modular Components of a Databricks Pipeline#
In a Databricks pipeline, various modular components—including Config, Infrastructure, Utilities, and Common—work together to build scalable, maintainable, and reusable pipelines. Each of these components plays a distinct role but is tightly integrated to abstract complexity and enable flexibility across environments (e.g., dev, test, prod).
How the Components Work Together#
-
Config Module:
- Acts as a centralized blueprint for defining key parameters like paths, schemas, catalogs, and validation rules.
- Provides environment-specific settings (e.g., fordev,test, orprod) to enable dynamic behavior without hardcoding.
- Supplies inputs to other components to construct directories, configure paths, and perform validations dynamically. -
Infrastructure Module:
- Uses configuration settings to provision the necessary resources, such as directories for landing zones, checkpoints, and processed data.
- Automates the creation of schemas and tables in the Databricks Unity Catalog or Delta Lake environment, ensuring the pipeline is ready for data ingestion. -
Utilities Module:
- Provides reusable helper functions for tasks such as logging, data validation, path generation, and metadata enrichment.
- Impacts both the Infrastructure and Common Modules by supplying pre-built utilities that simplify validation processes and enable traceability. -
Common Module:
- Acts as the backbone for shared functions, such as SQL execution, Dataset transformations, and error handling, which are used across the entire pipeline.
- Relies on the Utilities Module for logging and reusable logic.
- Incorporates configuration-driven inputs to dynamically execute ingestion workflows.
Interaction Among Components#
- The Config Module is the starting point for the pipeline, supplying dynamically defined inputs such as paths, schemas, and table definitions.
- The Infrastructure Module consumes these inputs to create all necessary resources, such as directories, schemas, and tables, ensuring the operational readiness of the pipeline environment.
- The Utilities Module handles dynamic validation, logging, and metadata enrichment, which are integrated into all stages of data processing and ingestion.
- The Common Module orchestrates the flow of the pipeline by combining the outputs of the Utilities Module, leveraging the pre-built resources from the Infrastructure Module, and adhering to environment-specific configurations defined in the Config Module.
Together, these components abstract the complexity of pipeline construction and ensure modularity. This modular design makes it easy to adjust or scale the pipeline as new data sources, environments (e.g., dev, test, prod), or business requirements emerge.
Below is a detailed explanation of each component, outlining its role, responsibilities, and how it contributes to the pipeline's flexibility and efficiency.
Configuration#
The configuration file is a critical component in the Databricks pipeline, serving the following roles:
- Acts as the blueprint for pipeline behavior by defining environment-specific settings and static parameters.
- These include paths for raw, processed, and checkpointed data, as well as table definitions and schema names.
- Enables customization for different environments (e.g., dev, test, prod), ensuring flexibility in pipeline execution.
- Promotes consistency across environments, reducing duplication and errors by standardizing configurations.
Key Components of the Config File#
The main components defined in the Config file include:
-
Paths: - Define where raw, processed, checkpointed, and failed files are stored. - Example:
landing_volume_path: Path where raw files arrive for processing. -
Catalogs and Schemas: - Specify database structures for organizing pipeline assets such as tables and views. - Example:
databricks.catalogorschema_base, tailored to the environment. -
Table Definitions: - Specify the raw tables to be created and managed during the pipeline. - Example:
json_types: A list of table names likeuser_profileorworkout. -
Validation Rules: - Define quality checks such as schema validation and primary/foreign key constraints. - Example:
"PKS": Trueenables primary key validation for the environment.
How the Config File Acts as Input and Blueprint#
The Config file provides critical inputs that guide every aspect of pipeline execution. It acts as a blueprint in the following ways:
-
Parameterizing the Workflow: - Dynamically adjusts key parameters like schema names, paths, and quality checks based on the environment (
dev,test, orprod). -
Guiding Infrastructure Setup: - Supplies paths and schemas used by Infrastructure to create directories and tables dynamically.
-
Serving as a Single Source of Truth: - Centralizes reusable configurations, making the pipeline both consistent and adaptable.
Detailed Explanation of Configuration ℹ️#
from typing import Any
from conf.constants import Constants
def get_databricks_configuration(mode: str, env: str = "dev") -> dict[Any, Any]:
"""
Returns global configuration for Databricks workflows to manage infrastructure.
:param mode: Specifies the workflow mode (e.g., "batch", "stream").
:param env: Specifies the runtime environment (e.g., "dev", "tst", "prod").
:return: A dictionary containing the configuration for the Databricks workflow.
"""
# Define catalog and schema base names dynamically based on environment and mode
catalog = f"digital_health_catalog_bronze_{env}" # Catalog for managing data
schema_base = f"kakao_{env}_{mode}" # Base for schemas
# Define the data provider
data_provider = "kakao" # Data provider prefix
# Define tables to handle in the workflow
registered_input = [
"consent_status", # Table for storing consent data
"medication", # Table for medication details
"user_profile", # Table for user profile information
"insulin", # Table for insulin-related data
"smart_pen", # Table for smart pen device data
"walking", # Table for walking data
"meal", # Table for meal tracking
"smart_pen_error_events", # Table for logging errors with smart pen devices
"workout", # Table for workout data
]
# Define quality checks and their enablement status by environment
checks_enablement = {
"dev": { # Development environment: enable most checks
Constants.BKS: False, # Disable book-specific checks
Constants.PKS: True, # Enable primary key validation
Constants.REFS: True, # Enable reference checks
Constants.FKS: False, # Disable foreign key validation
Constants.SCHEMA: True, # Enable schema validation
Constants.SCD: True, # Enable Slowly Changing Dimension (SCD) checks
Constants.CLEAR_STATUS: True, # Allow clearing all quality flags
Constants.VALUE_MAX_LENGTH: True, # Enable max length validation
},
"tst": {}, # Test environment: No checks defined yet
"val": {}, # Validation environment: No checks defined yet
"prod": {}, # Production environment: No checks defined yet
}
# Return the full configuration dictionary for the Databricks workflow
return {
"databricks": {
# Secrets scope for accessing sensitive credentials/resources
"secrets_scope": f"kakao-integration-{env}", # Secrets scope for environment (e.g., dev, prod)
# Retain files for 7 days when vacuuming
"vacuum_retain_hours": 7 * 24, # Retain data in Delta Table for 7 days
# Catalog and schema details
"catalog": catalog, # Databricks catalog
"schema_base": schema_base, # Base schema name
# Bronze table for landing-to-bronze pipelines
"bronze_table": f"bronze_kakao_{mode}_{env}", # Dynamic table name for bronze layer
# Landing volume details
"landing_volume": "landing", # Default landing volume name
"landing_volume_path": ( # Path in the Azure Data Lake Store
f"abfss://digital-health-catalog-landing-dev@dcmanagedprddev.dfs.core.windows.net/kakao_{env}_autoloader"
),
# Directory structures for different data states
"data_provider_path": f"{data_provider}", # Path for raw data provider folder
"incoming_path": f"{data_provider}/incoming", # Path for incoming data files
"failed_path": f"{data_provider}/failed", # Path for failed records
"loaded_path": f"{data_provider}/loaded", # Path for successfully loaded files
"checkpoint": f"{data_provider}/checkpoint", # Path for Spark streaming checkpoints
# Tables specified in the input schema
"json_types": registered_input, # Predefined list of tables to handle
# Prefix for external volumes
"external_vol_prefix": "landing/digital_health/kakao", # Volume path prefix
# Quality checks mapping based on environment
"quality_checks": checks_enablement.get(env, {}), # Retrieve enabled checks for the selected environment
# Flag to determine whether to recreate the environment
"recreate_env": True, # If True, environment will be cleared and recreated
}
}
# Example Usage
# Call the configuration function with parameters
mode = "batch" # Specify mode (e.g., batch processing)
env = "dev" # Specify environment (e.g., dev, tst, prod)
# Retrieve the full Databricks configuration for this mode and environment
config = get_databricks_configuration(mode=mode, env=env)
# Print the configuration for inspection
# Output:
# {
# "databricks": {
# "secrets_scope": "kakao-integration-dev",
# "vacuum_retain_hours": 168,
# "catalog": "digital_health_catalog_bronze_dev",
# "schema_base": "kakao_dev_batch",
# "bronze_table": "bronze_kakao_batch_dev",
# "landing_volume": "landing",
# "landing_volume_path": (
# "abfss://digital-health-catalog-landing-dev@dcmanagedprddev.dfs.core.windows.net/kakao_dev_autoloader"
# ),
# "data_provider_path": "kakao",
# "incoming_path": "kakao/incoming",
# "failed_path": "kakao/failed",
# "loaded_path": "kakao/loaded",
# "checkpoint": "kakao/checkpoint",
# "json_types": [
# "consent_status",
# "medication",
# "user_profile",
# "insulin",
# "smart_pen",
# "walking",
# "meal",
# "smart_pen_error_events",
# "workout"
# ],
# "external_vol_prefix": "landing/digital_health/kakao",
# "quality_checks": {
# "BKS": False,
# "PKS": True,
# "REFS": True,
# "FKS": False,
# "SCHEMA": True,
# "SCD": True,
# "CLEAR_STATUS": True,
# "VALUE_MAX_LENGTH": True
# },
# "recreate_env": True
# }
# }
Click here to access the repository: DHO-PoC1/src/conf/databricks_config.py
Infrastructure#
Infrastructure provides the backbone of the pipeline by orchestrating schema creation, directory setup, and managing external volumes. These functions ensure that pipelines start with a properly configured environment.
clear_schema_setup() ℹ️#
def clear_schema_setup(svcs: dict[Any, Any], catalog: str, schema: str) -> None:
"""
Deletes all objects in the specified schema within the given catalog.
"""
# Execute SQL commands to drop the schema and its contents
databricks_common.run(
svcs,
(
f"USE CATALOG {catalog}", # Switch to the specified catalog
f"DROP SCHEMA IF EXISTS {schema} CASCADE", # Drop schema and all objects in it
),
)
# Example Usage
services = get_svcs("dev") # Initialize Databricks services
catalog = "example_catalog" # The target catalog
schema = "example_schema" # The target schema to clear
clear_schema_setup(services, catalog, schema) # Clear all objects in the schema
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Deletes all objects (tables, views, etc.) in the specified schema within a catalog to reset the environment. | - svcs (dict): Spark services containing the logger and execution utilities.- catalog (str): The catalog where the schema exists.- schema (str): The schema to be dropped and cleared. |
Executes schema-clearing SQL commands without returning a value. | When resetting a schema environment to avoid clutter during iterative development or deployment. |
create_directories() ℹ️#
def create_directories(root_path: str, conf: dict[Any, Any]) -> None:
"""
Creates a directory structure for the landing zone and associated folders.
"""
# Extract directory paths from the configuration dictionary
data_provider_path = conf["databricks"]["data_provider_path"]
incoming_path = conf["databricks"]["incoming_path"]
failed_path = conf["databricks"]["failed_path"]
loaded_path = conf["databricks"]["loaded_path"]
checkpoint = conf["databricks"]["checkpoint"]
# Create the required directories in Databricks Filesystem (DBFS)
dbutils.fs.mkdirs(f"{root_path}/{data_provider_path}") # Directory for data providers
dbutils.fs.mkdirs(f"{root_path}/{incoming_path}") # Directory for incoming data
dbutils.fs.mkdirs(f"{root_path}/{failed_path}") # Directory for failed records
dbutils.fs.mkdirs(f"{root_path}/{loaded_path}") # Directory for loaded files
dbutils.fs.mkdirs(f"{root_path}/{checkpoint}") # Directory for Spark checkpoints
# Example Usage
root_path = "/Volumes/example_catalog/example_schema/raw_data" # Root path of the file system
config = {
"databricks": {
"data_provider_path": "provider_path", # Path for data provider
"incoming_path": "incoming", # Path for incoming data
"failed_path": "errors", # Path for failed records
"loaded_path": "loaded", # Path for processed data
"checkpoint": "checkpoints", # Path for Spark checkpoints
}
}
# Create directories for data processing
create_directories(root_path, config)
# Output: The specified directories are created in DBFS.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Creates a structured directory system for raw data, failed records, loaded data, incoming data, and checkpoints in the Databricks Filesystem (DBFS). | - root_path (str): The root path for the landing zone.- conf (dict): Configuration dictionary defining directory structures. |
Creates directories in DBFS without returning a value. | Before data ingestion workflows to ensure required directories (e.g., incoming, checkpoints) exist. |
create_schema() ℹ️#
def create_schema(svcs: dict[Any, Any], conf: dict[Any, Any], create_dirs: bool = False) -> None:
"""
Creates schemas, tables, and external volumes if they do not already exist.
Optionally clears existing schemas and creates directories.
"""
# Extract catalog, schema, and volume details from the configuration
catalog = conf["databricks"]["catalog"]
schema = conf["databricks"]["schema_base"]
recreate_environment = bool(conf["databricks"]["catalog"])
volume_landing = conf["databricks"]["landing_volume"]
volume_landing_location = conf["databricks"]["landing_volume_path"]
volume_fs_root = f"/Volumes/{catalog}/{schema}/{volume_landing}"
# Clear the existing schema if `recreate_environment` is True
if recreate_environment:
svcs.get("logger").warning(f"Clearing schema {catalog}.{schema}")
clear_schema_setup(svcs, catalog, schema)
# Create schema and set up external volumes
databricks_common.run(
svcs,
(
f"USE CATALOG {catalog}", # Switch to the specified catalog.
f"CREATE SCHEMA IF NOT EXISTS {schema}", # Create schema if not present.
f"CREATE EXTERNAL VOLUME IF NOT EXISTS {schema}.{volume_landing} LOCATION '{volume_landing_location}'"
),
)
# Create directories if `create_dirs` is enabled
if create_dirs:
create_directories(volume_fs_root, conf)
# Example Usage
services = get_svcs("dev") # Initialize Databricks services
config = {
"databricks": {
"catalog": "example_catalog",
"schema_base": "example_schema",
"landing_volume": "raw_data",
"landing_volume_path": "/mnt/raw_data/",
"json_types": ["table1", "table2"], # JSON table names
}
}
create_dirs = True
# Invoking the schema creation
create_schema(services, config, create_dirs)
# Outputs:
# 1. Schema is created if it does not already exist.
# 2. Tables and views are created for the configured JSON types.
# 3. Directory structure is created in DBFS if `create_dirs=True`.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Creates schemas, tables, and external volumes for Databricks pipelines, optionally clearing existing ones and generating directory structures. | - svcs (dict): Databricks services containing Spark, Logger, and execution utilities.- conf (dict): Pipeline configuration defining catalog, schema, and volume settings.- create_dirs (bool): Optional flag to also create directory structures for the schema. |
Executes schema, table, and volume creation SQL commands without returning a value. | During initial raw ingestion or schema layer setup for preparing Bronze layer resources and pipelines. |
create_view() ℹ️#
def create_view(svcs: dict, catalog: str, schema: str, table: str, layer: str, history_type: str) -> None:
"""
Creates a SQL view for the specified table by applying harmonization, field mapping, and quality checks.
:param svcs: Spark services including SparkSession and logger utilities.
:param catalog: Target Databricks catalog where the schema exists.
:param schema: Schema in the catalog where the view will be created.
:param table: Name of the table for which the view is being created.
:param layer: Pipeline layer (Bronze/Silver/Gold) where the view belongs.
:param history_type: History policy for Slowly Changing Dimensions (e.g., SCD1, SCD2).
:return: None
"""
# Step 1: Fetch schema and transformations
# Get all fields from JSON schema
fields = JSON_SCHEMAS[layer.upper()].get(table).fields
conversions = SCHEMA_CONVERSION[layer.upper()] # Mapping rules for time and field transformations
harmonization = QUALITY_CHECKS.get(Constants.HARMONIZATION).get(table) # Join logic for lookup tables
# Step 2: Harmonization (joining external reference tables)
harmonization_join = ""
if harmonization:
harmonization_field = list(harmonization.keys())[0] # Field to harmonize
harmonization_table = harmonization[harmonization_field]["table"] # Lookup table
harmonization_join = f"JOIN {harmonization_table} b ON a.{harmonization_field} = b.value"
# Step 3: Map fields and transform time fields
mapped_time_field = conversions.get(Constants.BRONZE_TO_SILVER_TIME_FIELDS_MAP, {}).get(table, {})
transformed_fields = [
f"a.{field.name} AS {databricks_utils.camel_to_snake(field.name)}" # Convert to snake_case
for field in fields if field.name not in mapped_time_field.keys()
]
for src_field, dest_field in mapped_time_field.items():
transformed_fields.append(f"timestamp(a.{src_field}) AS {databricks_utils.camel_to_snake(dest_field)}")
transformed_fields.append(f"a.{src_field} AS {databricks_utils.camel_to_snake(dest_field)}_local")
# Step 4: Add validation and quality rules to the SQL view
view_sql = f"""
CREATE OR REPLACE VIEW `{catalog}`.`{schema}`.`view_{table}` AS
SELECT {", ".join(transformed_fields)}
FROM `{catalog}`.`{schema}`.`{table}` a {harmonization_join}
WHERE sys_dq_status < 2
AND COALESCE(sys_is_deleted, False) = FALSE
"""
# Step 5: Execute the SQL command to create the view
databricks_common.run(svcs, (f"USE CATALOG `{catalog}`", view_sql))
#Example sql Created by the function
"""
CREATE OR REPLACE VIEW `analytics_catalog`.`silver_schema`.`view_sales` AS
SELECT
a.sale_id AS sale_id,
a.sale_amount AS sale_amount,
a.currency AS currency,
timestamp(a.sale_time) AS sale_time_utc,
a.sale_time AS sale_time_local,
b.discount_rate AS discount_rate
FROM `analytics_catalog`.`bronze_schema`.`sales` a
JOIN discounts_table b ON a.discount_code = b.discount_code
WHERE sys_dq_status < 2
AND COALESCE(sys_is_deleted, False) = FALSE;
"""
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Creates views for specific tables, applying harmonization logic, field mapping, and validations such as filtering duplicates, handling quality issues, and aligning timestamps. | - svcs (dict): Spark services for executing commands, including SparkSession and logger.- catalog (str): Databricks catalog where the schema resides.- schema (str): Schema where the view will be created.- table (str): Raw table name used to create the view.- layer (str): Target pipeline layer (e.g., Bronze, Silver, or Gold).- history_type (str): Historical dimension type (e.g., SCD1 or SCD2). |
Executes SQL commands to generate views tailored to the table and layer configuration. | Use this function when generating Silver or Gold layer views to standardize data, integrate reference tables, and enforce validation rules. |
Infrastructure Functions Summary#
| Function Name | What It Does | Input | Output | When to Use |
|---|---|---|---|---|
clear_schema_setup() |
Deletes all objects in the provided schema/catalog to reset an environment using a cascade delete option. | svcs (dict), catalog (str), schema (str) |
Executes schema-clearing SQL commands. | Before resetting an environment or preparing it for recreation. |
create_directories() |
Creates a directory structure (e.g., landing zone, checkpoints) in Databricks Filesystem (DBFS). | root_path (str), conf (dict) |
Creates directories in DBFS. | Before pipelines requiring structured landing zones, checkpoints, or logs run. |
create_schema() |
Automates schema, table, and external volume creation, optionally clearing existing components and directories. | svcs (dict), conf (dict), create_dirs (bool) |
Executes schema, table, and SQL commands. | During pipeline setup to ensure the schema and directories are ready for data ingestion workflows. |
create_view() |
Creates logical views for tables, applying harmonization, field mapping, and quality rules to generate analytics-ready data. | svcs (dict), catalog (str), schema (str), table (str), layer (str), history_type (str) |
Executes SQL commands to define or update views for specific tables and pipeline layers. | When creating views for Silver or Gold layers to provide clean, enriched, and validated data for downstream analytics. |
Click here to access the repository: DHO-PoC1/src/dhopoc1/databricks_infrastructure.py
Utility Files#
Utility files provide reusable and modular functions for handling common operations such as schema conversion, logging, and metadata enrichment. These files streamline development by reducing redundancy and centralizing commonly used logic.
get_logger() ℹ️#
def get_logger():
"""Returns a logger with the name of the current module."""
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s][%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S%z",
)
logging.getLogger("py4j").setLevel(logging.ERROR)
return logging.getLogger(__name__)
# Example Usage
# Initialize logger
logger = get_logger() # Input: None, Output: logger instance
# Log a message
logger.info("Pipeline started") # Logs informational message
logger.error("An error has occurred!") # Logs error message
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Provides a centralized logging mechanism to track pipeline execution, debug issues, and record error details. This ensures traceability and simplifies debugging for large-scale workflows. | None | A configured Logger instance | - To log pipeline progress, warnings, or critical errors. - For debugging issues or tracking metadata during data ingestion. |
get_svcs() ℹ️#
def get_svcs(env: str | None) -> dict[Any, Any]:
"""
Initializes Databricks services including SparkSession, DBUtils, Logger,
and job-specific parameters.
:param env: Specifies the runtime environment (e.g., `dev`, `test`, `prod`).
:return: A dictionary containing foundational services and job-specific settings.
"""
return {
"spark": databricks_common.get_spark(), # SparkSession initialization
"dbutils": databricks_common.get_dbutils(), # Filesystem operations
"logger": get_logger(), # Logger instance for tracking pipeline progress
"env": env, # Current environment (e.g., dev or prod)
"args": databricks_common.get_job_run_id_params() # Job-specific parameters
}
# Example Usage:
env = "dev"
services = get_svcs(env)
spark_session = services["spark"] # Access the SparkSession
logger = services["logger"] # Access logger instance
job_params = services["args"] # Fetch job-specific arguments
| What It Does | Input | Output | When to Use |
|---|---|---|---|
Initializes Databricks services like SparkSession, DBUtils for filesystem operations, Logger for logging, and job-specific parameters (args) for workflow execution. Ensures smooth operation of pipelines. |
env (str): Specifies the runtime environment (e.g., dev, test, or prod). |
A dictionary containing: - spark: SparkSession instance.- dbutils: Utilities for file and credential management.- logger: Logger for tracking progress and errors.- env: Runtime environment.- args: Job parameters like run_id or other pipeline-specific details. |
- Use at the start of pipelines to initialize foundational services and fetch job arguments. - For workflows requiring consistent logging and Spark/DBUtils initialization. |
get_volume_path() ℹ️#
def get_volume_path(conf: dict[Any, Any], config_volume_name: str) -> str:
"""
Returns volume path
"""
return f'/Volumes/{conf["databricks"]["catalog"]}/' f'{conf["databricks"]["schema_base"]}/' f"{config_volume_name}"
# Example Usage
# Input: Configuration dictionary and volume name
conf = {
"databricks": {
"catalog": "example_catalog", # Catalog name
"schema_base": "example_schema", # Schema name
}
}
volume_name = "landing_volume"
# Generate the volume path
volume_path = get_volume_path(conf, volume_name)
# Output: "/Volumes/example_catalog/example_schema/landing_volume"
```
</div>
| **What It Does** | **Input** | **Output** | **When to Use** |
|----------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------|--------------------------------------------|---------------------------------------------------------------------------------------------------------------|
| Dynamically generates volume paths for various processing stages (e.g., landing zones, checkpoints) to eliminate hardcoding and ensure adaptability across pipelines. | - `conf` (dict): Configuration dictionary containing catalog, schema, and volume details.<br>- `config_volume_name` (str): The name of the volume (e.g., "landing_volume"). | A dynamically constructed **string path** representing the volume directory. | - To create dynamic paths for raw data, checkpoints, or output directories.<br>- When pipelines need flexible paths for different environments (e.g., dev, test, prod). |
---
#### **sql_convert_schema_to_create_table()** ℹ️ {.hover-title}
<div class="hover-content">
```python
def sql_convert_schema_to_create_table(catalog: str, schema: str, table_name: str) -> str:
"""
Converts the given schema into a SQL CREATE TABLE statement for a Delta table.
:param catalog: The Databricks catalog where the table will reside.
:param schema: The schema within the catalog where the table will be created.
:param table_name: The name of the table to be created.
:return: A CREATE TABLE SQL statement as a string.
"""
# Start the CREATE TABLE statement with the provided catalog, schema, and table name
create_table_statement = f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{table_name} (\n"
# Iterate over each field defined in the JSON_SCHEMAS_BRONZE for the table
for field in JSON_SCHEMAS_BRONZE.get(table_name).fields:
field_name = field.name # Get the name of the field
field_type = field.dataType.typeName() # Get the field's Spark data type
# Map Spark data types to SQL-compatible data types
if isinstance(field.dataType, IntegerType):
field_type = "INT"
elif isinstance(field.dataType, FloatType):
field_type = "FLOAT"
elif isinstance(field.dataType, StringType):
field_type = "STRING"
elif isinstance(field.dataType, TimestampType):
field_type = "TIMESTAMP"
elif isinstance(field.dataType, BooleanType):
field_type = "BOOLEAN"
else:
# Raise an error if the type is unsupported
raise ValueError(f"Unsupported data type: {field.dataType}")
# Append the field definition to the CREATE TABLE statement
create_table_statement += f" {field_name} {field_type},\n"
# Remove the trailing comma and close the CREATE TABLE statement
create_table_statement = create_table_statement.rstrip(",\n") + "\n)"
# Return the generated CREATE TABLE SQL statement
return create_table_statement
# Example Usage
# Input: Catalog, schema, and table name
catalog = "example_catalog" # The name of the Databricks catalog
schema = "example_schema" # The schema within the catalog
table_name = "example_table" # The name of the table to be created
# Generate the SQL CREATE TABLE statement
sql_statement = sql_convert_schema_to_create_table(catalog, schema, table_name)
# Output: A SQL statement defining the table structure
print(sql_statement)
```
</div>
| **What It Does** | **Input** | **Output** | **When to Use** |
|----------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|------------------------------------------------|---------------------------------------------------------------------------------------------------------|
| Automates the creation of SQL `CREATE TABLE` statements by translating Spark schema definitions into SQL-compatible table structures. This ensures schema alignment between raw data and downstream consumers. | - `catalog` (str): The Databricks catalog where the table will reside.<br>- `schema` (str): The schema within the catalog.<br>- `table_name` (str): Delta table name to create. | A valid **SQL string** for a `CREATE TABLE` statement. | - During infrastructure setup when creating Delta tables dynamically.<br>- For automating table creation workflows across pipelines. |
---
#### **camel_to_snake()** ℹ️ {.hover-title}
<div class="hover-content">
```python
def camel_to_snake(camel_str: str) -> str:
"""
Converts a camelCase string to snake_case.
"""
# Create a components list to hold the transformed string
components = []
for char in camel_str:
# Insert underscores before uppercase letters
if char.isupper() and components:
components.append("_")
components.append(char.lower()) # Convert all characters to lowercase
# Join the components list into a snake_case string
return "".join(components)
# Example Usage:
# Convert camelCase to snake_case
field = "userName"
formatted_field = camel_to_snake(field)
# Output: "user_name"
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Converts camelCase field names to snake_case format to ensure compatibility with SQL standards and consistent field naming. | camel_str (str): A string field name in camelCase. |
String formatted in snake_case. | Use when transforming raw field names for table or view creation in Databricks pipelines. |
clone_test_data() ℹ️#
def clone_test_data(config: dict[str, Any]) -> None:
"""
Clones test data JSON files in the landing zone by appending a timestamp.
"""
# Step 1: Resolve the root landing path using the configuration
landing_path = get_volume_path(conf=config, config_volume_name=config["databricks"]["landing_volume"])
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") # Current timestamp for new file naming
# Step 2: Iterate through the landing zone and process only JSON files
for root, _, files in os.walk(landing_path):
for file in files:
# Skip processed files and hidden metadata files
if file.endswith(".json") and not file.endswith("_.json") and not file.startswith("_"):
# Append a timestamp-based suffix to new file names
dst_file = file.replace(".json", f"_.json")
shutil.copy2(f"{root}/{file}", f"{root}/{timestamp}_{dst_file}")
# Example Usage:
# Clone test data in a landing zone
config = {"databricks": {"landing_volume": "landing_zone"}}
clone_test_data(config)
# New timestamped files will appear alongside existing ones in the landing zone.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Clones test JSON files by appending a timestamp to simulate new file arrivals in the landing zone. This is useful for testing workflows and pipeline ingestion. | config (dict): Configuration containing paths and relevant details for the landing zone. |
Timestamped JSON files are created in the specified landing zone directory. | Use during development or testing to validate workflows with new test data. |
autoloader_data() ℹ️#
def autoloader_data(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str, args: dict[str, str]) -> None:
"""
Reads input JSON files from a source path and loads them into a Delta Bronze table,
enriching the data with additional metadata fields. Uses Databricks Auto Loader for streaming.
:param svcs: Databricks services (e.g., SparkSession, Logger, etc.).
:param conf: Configuration dictionary containing paths, catalog, schema, etc.
:param table_name: Name of the Delta Bronze table where the processed data is written.
:param args: Job-specific parameters, such as `run_id`.
"""
# Step 1: Extract catalog, schema, and other configurations
catalog = conf["databricks"]["catalog"]
schema = conf["databricks"]["schema_base"]
incoming_path = conf["databricks"]["incoming_path"]
failed_path = conf["databricks"]["failed_path"]
landing_volume = conf["databricks"]["landing_volume"]
checkpoint = conf["databricks"]["checkpoint"]
target_table = f"`{catalog}`.`{schema}`.`{table_name}`"
run_id = args["run_id"]
# Paths for data ingestion, checkpoints, and bad records
source_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{incoming_path}/{table_name}/"
checkpoint_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{checkpoint}/{table_name}/"
bad_files_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{failed_path}/{table_name}/"
# Log execution details for troubleshooting and monitoring
svcs.get("logger").info(f"Execution details: {args}")
svcs.get("logger").info(f"Source path: {source_path}")
# Step 2: Extract the table schema and metadata schema
table_json_schema = JSON_SCHEMAS.get(conf["databricks"]["layer"].upper(), {}).get(table_name)
metadata_schema = METADATA_SCHEMA
# Step 3: Read source data with Auto Loader and validate against schema
# Databricks Auto Loader handles incremental data ingestion.
# Invalid records (malformed JSON or incompatible schema) are redirected to `badRecordsPath`.
df = (
svcs["spark"]
.readStream.format("cloudFiles")
.option("cloudFiles.format", "json") # Expecting JSON file format
.schema(table_json_schema) # Apply predefined schema for input data
.option("cloudFiles.schemaLocation", checkpoint_path) # Store schema inference
.option("badRecordsPath", bad_files_path) # Redirect invalid JSON files
.load(source_path) # Load records from the specified source path
.select("*", "_metadata") # Include Auto Loader metadata for debugging and processing
)
# Step 4: Filter invalid data and enrich with metadata fields
# Filtering removes records missing _metadata (e.g., incomplete or malformed JSON).
df = df.filter(col("_metadata").isNotNull())
# Enrich data with cleaned metadata. This ensures `_metadata` is compatible with its schema.
df = df.withColumn("clean_metadata", col("_metadata").cast(metadata_schema))
df = df.drop("_metadata").withColumnRenamed("clean_metadata", "_metadata")
# Add system-generated metadata fields for lineage and traceability
# `sys_received_at`: Timestamp for when the pipeline ingests the record
df = df.withColumn("sys_received_at", current_timestamp())
# `sys_source_filename`: Name of the file the record originates from
df = df.withColumn("sys_source_filename", df["_metadata"].file_name)
# `sys_source_name`: Name of the source system (e.g., "kakao" in this case)
df = df.withColumn("sys_source_name", lit("kakao")) # Example value for system source
# System flags
# `sys_is_deleted`: False for all incoming records in the Bronze layer
df = df.withColumn("sys_is_deleted", lit(False))
# `sys_job_id`: Identifier assigned to the specific pipeline job execution
df = df.withColumn("sys_job_id", lit(run_id))
# Step 5: Generate hash keys and surrogate keys to uniquely identify records
# Generate a unique hash key for deduplication and change detection
# `sys_hash_key` combines all business columns to create a unique fingerprint of each record.
concat_col = concat_ws("|", *[col(c) for c in df.drop("_metadata").columns])
df = df.withColumn("sys_hash_key", md5(concat_col)) # MD5 hash for efficient change detection
# Generate a surrogate primary key
# A UUID is used as a unique surrogate key (`sys_surrogate_pk`) for each record.
# This ensures that each record is uniquely identifiable, regardless of source system properties.
df = df.withColumn("sys_surrogate_pk", udf(generate_uuid, StringType())())
# Step 6: Write enriched data back into a Delta table
# The function writes to the target Bronze Delta table using structured streaming.
query = (
df.writeStream.format("delta")
.outputMode("append") # Incrementally append new records into the Delta table
.option("checkpointLocation", checkpoint_path) # Enable fault-tolerant checkpointing for idempotency
.trigger(once=True) # Process the data in one micro-batch
.toTable(target_table) # Save processed records into the Delta Bronze table
)
# Wait until the streaming query completes
query.awaitTermination()
# Log success message after successful ingestion and data write
svcs.get("logger").info(f"Successfully loaded data into: {target_table}")
# Example Usage:
# Initialize services and configuration
services = get_svcs("dev")
conf = {
"databricks": {
"catalog": "example_catalog",
"schema_base": "example_schema",
"landing_volume": "raw_data",
"incoming_path": "incoming",
"failed_path": "bad_records",
"checkpoint": "checkpoints",
}
}
args = {"run_id": "job12345"}
# Call the autoloader function
autoloader_data(services, conf, "example_table", args)
# Outputs:
# 1. Data from JSON files is read and written to the "example_table".
# 2. Invalid files are redirected to `failed_path` for troubleshooting.
# 3. Metadata fields like `sys_received_at`, `sys_surrogate_pk`, and `sys_hash_key` are added.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Ingests JSON files into Delta Bronze tables, ensuring data quality through filtering and metadata enrichment. Applies surrogate key generation for traceability and schema validation for consistency. Tracks ingestion progress using checkpoint files for fault tolerance. | svcs (dict): Core services like Logger, SparkSession, and DBUtils.conf (dict): Configuration for paths, schema details, and metadata.table_name (str): Target Delta table.args (dict): Job-specific parameters, such as run_id. |
None. Processed data includes metadata fields ( sys_received_at, sys_job_id, sys_hash_key, sys_source_filename, and sys_surrogate_pk) and is written to Delta Bronze tables.Malformed records are redirected to the badRecordsPath.Checkpoint files ensure fault tolerance for batch or streaming ingestion. |
Use for fault-tolerant real-time or batch ingestion pipelines. Ideal for processing raw JSON data into Bronze Delta tables that require unique identifiers, metadata enrichment, and incremental ingestion. |
generate_uuid() ℹ️#
def generate_uuid() -> str:
"""
Generates a universally unique identifier (UUID).
"""
return str(uuid.uuid4())
# Example Usage:
# Generate a UUID and assign it to a record
record_id = generate_uuid()
# Output: A unique string such as "3fa85f64-5717-4562-b3fc-2c963f66afa6"
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Generates a new Universally Unique Identifier (UUID), often used as a surrogate key for Delta table records. | None | A unique string identifier. | Use for creating unique records, enabling deduplication or version tracking in Bronze tables. |
| --- |
add_sys_received_at_from_filename() ℹ️#
def add_sys_received_at_from_filename(
df: DataFrame,
file_name_col: str,
table_name: str = "_metadata.file_name",
output_col: str = "sys_received_at"
) -> DataFrame:
"""
Adds a `sys_received_at` timestamp column derived from filenames. Defaults to the file's modification time if parsing fails.
"""
# Step 1: Define a regex pattern to extract timestamps from filenames
pattern = f"{table_name}_(\\d{{4}}-\\d{{2}}-\\d{{2}}_\\d{{2}}_\\d{{2}}_\\d{{2}})"
extracted = F.regexp_extract(F.col(file_name_col), pattern, 1) # Extract the timestamp based on the regex
received_at_ts = F.to_timestamp(extracted, "yyyy-MM-dd_HH_mm_ss") # Convert extracted string to timestamp type
backup_received_at = F.col("_metadata.file_modification_time") # Default to the file's modification time
# Step 2: Assign timestamps to the `sys_received_at` column
sys_received_at = F.when((extracted.isNull()) | (extracted == ""), backup_received_at).otherwise(received_at_ts)
return df.withColumn(output_col, sys_received_at) # Add the new column to the DataFrame
# Example Usage:
# Add sys_received_at column to a DataFrame based on file names
df = spark.read.format("json").load("/path/to/data")
df_with_sys_received_at = add_sys_received_at_from_filename(
df=df,
file_name_col="_metadata.file_name",
table_name="my_table"
)
# New timestamp column ("sys_received_at") will be available in the resulting DataFrame.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
Extracts timestamps from filenames or _metadata. Defaults to file modification time if no valid timestamp is found. |
df (DataFrame): Input DataFrame.file_name_col (str): Column containing file names.table_name (str): Source table prefix.output_col (str): Column for extracted timestamp. |
Returns the input DataFrame with an added sys_received_at column. |
Use during data ingestion workflows requiring timestamps derived from filenames or modification times. |
update_sys_modified_at_on_hash_change() ℹ️#
def update_sys_modified_at_on_hash_change(
svcs: dict[Any, Any],
table_name: str,
pk_columns: list[str],
mapped: str,
run_id: str
) -> None:
"""
Updates `sys_modified_at` in Delta tables where hash-key changes are detected for primary key columns.
"""
# Step 1: Load the Delta table into a Spark DataFrame
df = svcs.get("spark").table(table_name)
# Step 2: Partition the data by primary keys and order it by `sys_received_at`
w = Window.partitionBy(*pk_columns).orderBy("sys_received_at")
# Detect hash changes across rows
hash_change = (col("sys_hash_key") != lag("sys_hash_key", 1).over(w)).cast("int")
# Step 3: Create a version number column using cumulative sums of hash changes
version_num = sum(when(row_number().over(w) == 1 | (hash_change == 1), 1).otherwise(0)).over(w)
df = df.withColumn("version_num", version_num)
# Step 4: Assign new `sys_modified_at` timestamps
version_window = Window.partitionBy(*pk_columns + ["version_num"])
df = df.withColumn(
"new_sys_modified_at",
when(version_num == 1, col(mapped).cast(TimestampType())).otherwise(min("sys_received_at").over(version_window))
)
# Step 5: Execute update to apply changes to the Delta table
sql_merge = f"""
MERGE INTO {table_name} AS target
USING (SELECT * FROM {df}) AS updates
ON target.hash_key = updates.hash_key
WHEN MATCHED THEN
UPDATE SET target.sys_modified_at = updates.new_sys_modified_at
"""
svcs.get("spark").sql(sql_merge)
# Example Usage:
# Update the Delta table with new sys_modified_at values for changed records
update_sys_modified_at_on_hash_change(
svcs=services,
table_name="my_table",
pk_columns=["id"],
mapped="original_timestamp_column",
run_id="run_1234"
)
# The function will detect hash changes and update `sys_modified_at` values accordingly.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
Updates the sys_modified_at value for Delta table records if hash key changes are detected, ensuring SCD (Slowly Changing Dimension) handling. |
svcs (dict): Databricks services (e.g., SparkSession).table_name (str): Fully qualified Delta table.pk_columns (list): Primary keys.mapped (str): Mapped column.run_id (str): Job execution ID. |
Updates the Delta table’s sys_modified_at column based on hash/key changes. |
Use for Bronze-to-Silver layer transformations or historical data versioning that needs SCD logic. |
quality_checks() ℹ️#
def quality_checks(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str, enable_checks: dict[str, str] = None) -> None:
"""
Performs quality checks on a Bronze Delta table. Updates quality flags for errors and validates schema, PK/FK constraints, and harmonization.
:param svcs: Databricks core services (e.g., SparkSession, Logger).
:param conf: Configuration settings for paths, schemas, etc.
:param table_name: Name of the Delta Bronze table being validated.
:param enable_checks: A dictionary of flags to toggle specific checks (e.g., PK, FK, schema).
:return: None
"""
# Step 1: Prepare configurations
layer = conf["databricks"]["layer"] # Current pipeline layer (e.g., Bronze)
catalog = conf["databricks"]["catalog"] # Catalog for the table
schema = f"{conf['databricks']['schema_base']}" # Schema within the catalog
bronze_table = f"`{catalog}`.`{schema}`.`{table_name}`" # Fully qualified table name
# Load the Delta table into a DataFrame
df = svcs["spark"].read.format("delta").table(bronze_table)
records_for_quality_checks = df.count() # Track number of records checked
# Log details about the quality check being performed
svcs.get("logger").info(f"Starting quality checks on Bronze table {table_name} for: {records_for_quality_checks} records.")
# Fetch the primary key configuration
pks = QUALITY_CHECKS.get(Constants.PKS, {})
# -------------------------------------------------------------------------------------
# 0. Clear quality flags on new or existing records
# Check whether quality flags should be cleared before starting validations.
if enable_checks.get(Constants.CLEAR_STATUS, False):
dqf.clear_quality_flags(svcs=svcs, bronze_table=bronze_table)
svcs.get("logger").info(f"Cleared all quality flags for table: {bronze_table}.")
else:
dqf.set_quality_flags(svcs=svcs, bronze_table=bronze_table)
svcs.get("logger").info(f"Set initial quality flags for new records in table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 1. Schema Validation
# Ensure the table schema aligns with the expected schema definition.
if enable_checks.get(Constants.SCHEMA, False):
# Combine system fields and table-specific schema fields.
combined_schema = StructType(
JSON_SCHEMAS[layer.upper()].get(table_name).fields
+ JSON_SCHEMAS_SYS.get("sys").fields
)
# Check if schema matches the expected definition.
if not dqf.check_schema(svcs=svcs, df_or_table=df, expected_schema=combined_schema, table=bronze_table):
svcs.get("logger").error(f"Schema mismatch detected for table: {bronze_table}. Skipping further checks.")
return # Stop quality checks if the schema validation fails.
svcs.get("logger").info(f"Schema successfully validated for table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 2. Primary Key (PK) Checks
# Identify null or duplicate primary key violations in the table.
if enable_checks.get(Constants.PKS, False):
# Check for null values in the primary key columns
dqf.check_primary_key_nulls(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
svcs.get("logger").info(f"Primary key null check completed for table: {bronze_table}.")
# Check for duplicate primary key values across records
dqf.check_primary_key_duplicates(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
svcs.get("logger").info(f"Primary key duplicate check completed for table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 3. Reference (Lookup) Validation
# Validate that records in the table reference valid keys from external lookup tables.
if enable_checks.get(Constants.REFS, False):
refs = QUALITY_CHECKS.get(Constants.REFS, {})
for ref_key, ref_value in refs.get(table_name, {}).items():
# Validate against the lookup table and columns
dqf.check_lookup_values(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_name=ref_key,
lookup_table=f"{catalog}.{ref_value.get('table')}",
lookup_col_name=ref_value.get("field")
)
svcs.get("logger").info(f"Reference validation completed for field `{ref_key}` in table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 4. Foreign Key (FK) Checks
# Ensure foreign key constraints are upheld and reference valid primary keys in external tables.
if enable_checks.get(Constants.FKS, False):
fks = QUALITY_CHECKS.get(Constants.FKS, {})
for fk_key, fk_value in fks.get(table_name, {}).items():
dqf.check_foreign_key(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_fk=fk_key,
remote_table=f"{catalog}.{schema}.{fk_value.get('table')}",
remote_col_pk=fk_value.get("field")
)
svcs.get("logger").info(f"Foreign key validation completed for field `{fk_key}` in table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 5. Harmonization Validation
# Harmonize record values to ensure consistency (e.g., common formats).
if enable_checks.get(Constants.HARMONIZATION, False):
harm = QUALITY_CHECKS.get(Constants.HARMONIZATION, {})
for harm_key, harm_value in harm.get(table_name, {}).items():
dqf.check_harmonization_values(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_name=harm_key,
lookup_table=f"{catalog}.{harm_value.get('table')}",
lookup_col_name=harm_value.get("field")
)
svcs.get("logger").info(f"Harmonization validation completed for field `{harm_key}` in table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 6. Check Length Constraints
# Verify that column values adhere to maximum lengths as defined in the schema.
if enable_checks.get(Constants.VALUE_MAX_LENGTH, False):
max_lengths = QUALITY_CHECKS.get(Constants.VALUE_MAX_LENGTH, {})
for limit_key, limit_value in max_lengths.get(table_name, {}).items():
dqf.check_max_length_field(
svcs=svcs,
bronze_table=bronze_table,
column=limit_key,
limit=limit_value,
)
svcs.get("logger").info(f"Length validation completed for field `{limit_key}` in table: {bronze_table}.")
services = get_svcs("dev")
conf = {
"databricks": {
"layer": "Bronze",
"catalog": "example_catalog",
"schema_base": "example_schema"
}
}
enable_checks = {
"SCHEMA": True,
"PKS": True,
"FKS": True,
"REFS": True,
"VALUE_MAX_LENGTH": True,
"HARMONIZATION": True,
"CLEAR_STATUS": False,
}
table_name = "example_table"
# Run quality checks
quality_checks(services, conf, table_name, enable_checks)
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Performs multiple quality validation checks on Bronze Delta tables, such as schema matching, primary key (PK), foreign key (FK), lookup validations, and harmonization checks. Tracks and flags errors, while ensuring clean and consistent data within the Bronze layer. | svcs (dict): Databricks services like SparkSession and Logger.conf (dict): Configuration settings for schema, catalog, and paths.table_name (str): Name of the Delta table to check.enable_checks (dict): A dictionary of boolean flags to enable or disable specific quality checks. |
Updates quality flags (e.g., error identification) directly within the Bronze table. | Use for ensuring data quality in Bronze Delta tables, aligning records to schema, PK/FK rules, and reference values. This function is useful before propagating records to Silver or Gold layers. |
Utility Functions Cheat Sheet#
| Function Name | Purpose and Utility | When to Use |
|---|---|---|
get_logger() |
Sets up centralized logging for tracking pipeline execution, debugging, and recording process-specific details. | Use in pipelines to trace execution progress, warnings, and critical errors. |
get_svcs() |
Initializes foundational services including SparkSession, Logger, and DBUtils required for pipeline execution. | Use at the start of workflows to set up foundational platform tools and fetch runtime arguments. |
get_volume_path() |
Dynamically generates paths for resources like landing zones, checkpoints, or output directories based on environment configuration. | Use when constructing paths dynamically for multi-environment pipelines, eliminating hardcoding. |
sql_convert_schema_to_create_table() |
Automates SQL CREATE TABLE statement generation by converting Spark schemas to SQL-compatible table structures. |
Use when creating Delta table definitions dynamically for infrastructure setup or initialization. |
autoloader_data() |
Reads JSON files using Databricks Auto Loader, validates schema, enriches metadata, adds surrogate keys, and writes data into Delta Bronze tables for fault-tolerant ingestion. | Use for real-time or batch ingestion workflows where fault tolerance, deduplication, and metadata tracking are required. |
clone_test_data() |
Creates timestamped duplicates of existing JSON files in landing zones to simulate new arrivals, enabling testing and debugging workflows. | Use during development or testing to validate ingestion workflows with new test data. |
camel_to_snake() |
Converts a camelCase string to snake_case format to align with SQL standards and consistent field naming conventions. | Use when standardizing raw field names for table or view creation in Databricks transformations. |
generate_uuid() |
Generates a Universally Unique Identifier (UUID) to uniquely identify records, useful for surrogate key generation. | Use when creating unique identifiers for deduplication, version tracking, or record traceability. |
update_sys_modified_at_on_hash_change() |
Updates the sys_modified_at column in Delta tables when sys_hash_key changes, enabling Slow Changing Dimensions (SCD) handling automatically. |
Use in Bronze-to-Silver transformations or historical data versioning workflows. |
add_sys_received_at_from_filename() |
Extracts timestamps from file names and adds them as the sys_received_at column; falls back to file modification time if no valid timestamp is found. |
Use during file-based ingestion workflows for ensuring accurate record traceability using timestamps. |
quality_checks() |
Validates schema alignment, primary/foreign key integrity, lookups, harmonization, and length constraints for Delta Bronze tables. | Use to ensure data in Bronze tables meets quality rules before transforming into Silver/Gold tables. |
Click here to access the repository: DHO-PoC1/src/conf/databricks_utils.py
Common Files#
Common files provide framework-level reusable code that is shared across all pipelines. These files handle low-level tasks such as SQL execution and interacting with the Databricks Filesystem (DBFS). By centralizing these methods, they ensure core functionality is implemented consistently and efficiently.
parse_environment_name() ℹ️#
def parse_environment_name(env: str) -> str:
"""
Validates the provided environment name to ensure it matches
one of the predefined valid environments (e.g., "dev", "prd").
Raises a ValueError for invalid environment names.
:param env: Name of the environment (e.g., "dev", "prd").
:return: The same environment name if valid.
"""
# List of valid environment names
valid_env_names = ("dev", "tst", "val", "prd")
# Check if the environment name is valid
if env not in valid_env_names:
raise ValueError(f"Environment name must be one of {valid_env_names}")
return env
# Example Usage
try:
# Step 1: Validate a valid environment name
environment = parse_environment_name("dev") # Input: "dev" (valid environment)
print(environment) # Output: "dev"
# Step 2: Attempt validation of an invalid environment name
invalid_env = parse_environment_name("invalid") # Input: "invalid" (invalid option)
print(invalid_env)
except ValueError as e:
# Output: Error message indicating valid options
print(e) # Output: "Environment name must be one of ('dev', 'tst', 'val', 'prd')"
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Validates the environment name to ensure pipelines are executed in predefined environments like "dev", "prd", or "tst". | env (string): The name of the environment. |
A validated environment name if valid. | Before running a pipeline to avoid errors caused by unintended or misconfigured execution environments. |
run() ℹ️#
def run(svcs: dict[Any, Any], statements: tuple[str, ...]) -> None:
"""
Executes a series of SQL statements sequentially using SparkSession.
Each statement is logged before execution for traceability.
:param svcs: Dictionary containing SparkSession and logger instance.
:param statements: Tuple of SQL statements to execute.
:return: None
"""
for statement in statements:
# Log the SQL statement before execution
svcs["logger"].info("Running SQL=%s", statement)
# Execute the SQL statement
svcs["spark"].sql(statement)
# Example Usage
# Step 1: Initialize services with SparkSession and logger
services = {
"spark": get_spark(), # SparkSession instance
"logger": get_logger(), # Logger instance
}
# Step 2: Define SQL statements to be executed
sql_statements = (
"CREATE DATABASE IF NOT EXISTS example_db", # Create a database
"CREATE TABLE IF NOT EXISTS example_db.example_table (id INT, name STRING)", # Create a table
"INSERT INTO example_db.example_table VALUES (1, 'John Doe')", # Insert a record
)
# Step 3: Use `run` to execute the SQL statements
run(services, sql_statements)
# Output:
# Logs each SQL statement and executes it via Spark. For example:
# Running SQL=CREATE DATABASE IF NOT EXISTS example_db
# Running SQL=CREATE TABLE IF NOT EXISTS example_db.example_table (id INT, name STRING)
# Running SQL=INSERT INTO example_db.example_table VALUES (1, 'John Doe')
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Executes a series of SQL statements sequentially using SparkSession, logging each query for traceability. | - svcs (dict): Contains the SparkSession and logger instance. - statements (tuple[str,...]): SQL statements to execute. |
None | For programmatically running SQL-based operations such as table creation, schema management, or data insertion. |
get_job_run_id_params() ℹ️#
def get_job_run_id_params() -> dict[str, Any]:
"""
Get the job run id parameters from the command-line arguments.
Parses `job_id` and `run_id` using argparse.
:return: A dictionary containing job parameters.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--job_id")
parser.add_argument("--run_id")
args = parser.parse_args()
return vars(args)
# Example Usage
# Step 1: Run a pipeline with command-line arguments
# Command-line: python script.py --job_id my_job --run_id 12345
params = get_job_run_id_params()
print(params) # Output: {'job_id': 'my_job', 'run_id': '12345'}
| What It Does | Input | Output | When to Use |
|---|---|---|---|
Parses and retrieves job_id and run_id parameters from command-line arguments. |
None | A dictionary with job_id and run_id. |
Use when running pipelines that require job metadata from command-line inputs. |
get_spark() ℹ️#
def get_spark() -> SparkSession:
"""
Returns the SparkSession object required to interact with Spark.
Uses Databricks' SparkSession builder to get or create the session.
:return: SparkSession instance
"""
return DatabricksSession.builder.getOrCreate() # Retrieve or initialize a SparkSession
# Example Usage
# Step 1: Call `get_spark` to initialize a SparkSession
spark = get_spark() # Input: None, Output: SparkSession instance
# Step 2: Use the SparkSession to run Spark operations
print(spark) # Output: The current SparkSession instance
| What It Does | Input | Output | When to Use |
|---|---|---|---|
Creates or retrieves the SparkSession, the entry point for Spark operations like queries, transformations, and distributed computation. |
None | A SparkSession instance | At the start of any pipeline that requires Spark functionality. |
get_dbutils() ℹ️#
| What It Does | Input | Output | When to Use |
|---|---|---|---|
Provides access to DBUtils for filesystem operations, secrets management, and library handling. Simplifies handling of files and secure credentials in Databricks. |
None | A DBUtils object | When interacting with the Databricks Filesystem (DBFS), or securely accessing credentials via secrets. |
Common Functions Cheat Sheet#
| Function Name | What It Does | Input Parameters | Output | When to Use |
|---|---|---|---|---|
get_spark() |
Creates or retrieves the SparkSession for interaction with Spark. |
None | SparkSession |
At the start of pipelines requiring Spark functionality. |
get_dbutils() |
Provides access to DBUtils for managing files and secrets securely. |
None | DBUtils |
For filesystem operations or securely managing credentials. |
parse_environment_name() |
Ensures the environment name is valid before execution. | env (string) |
Valid env name |
Before starting pipelines to validate environment input. |
run() |
Executes SQL queries sequentially, logging each for traceability. | svcs, statements |
None | To set up schemas or programmatically execute SQL workflows. |
get_job_run_id_params() |
Parses and retrieves job_id and run_id from command-line arguments. |
None | Dictionary with job_id and run_id |
Use when job metadata like job_id or run_id is required for execution. |
Flow of Execution in a Pipeline#
Here’s how the modular components come together during a typical file ingestion pipeline execution:
-
Initialize Configuration:
- The pipeline starts by loading the Config file based on selected
modeandenv. - Config outputs paths, schemas, table definitions, and validation information.
- The pipeline starts by loading the Config file based on selected
-
Prepare Infrastructure:
- Infrastructure leverages Config to:
- Create schemas and views dynamically.
- Set up directories (landing zone, checkpoint, failure logs) via
create_directories().
-
Enable Supporting Utilities:
- Utilities provide logging, schema-to-SQL conversions, and path handling for pipeline tasks.
- Example:
get_logger()logs pipeline events during schema and data preparation.sql_convert_schema_to_create_table()generates SQL queries.
-
Execute Core Logic:
- Common functions (
run,get_spark) execute SQL instructions and enable parallel data processing.
- Common functions (
Inputs and Outputs in File Ingestion Pipelines#
In the realm of data engineering, clarity on inputs and outputs forms the cornerstone of an effective file ingestion pipeline. As the first step in the data lifecycle, ingestion often handles a wide variety of file formats, data types, and infrastructure nuances. Without a clear understanding of the exact inputs and outputs required for these pipelines, the process can quickly become error-prone, inefficient, and difficult to maintain.
Knowing precisely what data needs to be ingested (inputs) and what format or structure it must ultimately take (outputs) forms the foundation for designing robust ingestion pipelines. Inputs define the raw data—its source, structure, quality, and relevant metadata—while outputs determine how the ingested data is stored, processed, and made available for downstream systems or analyses. Misalignment in this area can lead to significant issues, such as schema mismatches, incomplete transformations, or unmet business requirements.
By establishing well-defined inputs and outputs, data engineers can:
- Ensure Consistency: Define clear data contracts to describe the structure, quality, and expectations for both inputs and outputs, reducing the risk of discrepancies.
- Drive Automation: Streamline the ingestion process with automated validation, transformation, and data delivery based on agreed-upon configurations.
- Improve Scalability: Create modular pipelines that can easily adapt to changes in data sources or business needs while maintaining downstream reliability.
- Enable Collaboration: Provide clarity for stakeholders like data scientists, analysts, and operational teams about the available data and its usability.
Taking the necessary time to map out inputs and outputs transforms ambiguity into action, reduces technical debt, and provides a foundation for reliable, scalable, and future-proof pipelines. The sections below delve deeper into defining inputs and outputs, connecting them to configuration, and handling challenges with diverse file ingestion use cases.
The pipeline execution starts by processing inputs, which drive how the ingestion workflow behaves, and ends by producing outputs, which are consumed downstream for further processing, validation, or reporting. The Config acts as the main source for inputs and dynamically influences outputs.
Inputs#
The inputs are key parameters passed to the pipeline. They control the workflow’s behavior, enabling dynamic adaptability across environments such as dev, test, or prod.
How Configuration Acts as an Input#
Configuration files encapsulate all input definitions, acting as a centralized source to manage input parameters. By abstracting critical pipeline variables, the configuration ensures consistency and flexibility.
Key ways the configuration serves as an input:
1. Dynamic Path Resolution:
- Input paths like incoming_path and checkpoint incorporate environment details, allowing seamless switching between dev, test, or prod environments without code changes.
- Example:
{
"landing_volume_path": "abfss://landing@storage.dfs.core.windows.net",
"incoming_path": "incoming_files",
"failed_path": "failed_files"
}
incoming_path = f"{config['landing_volume_path']}/{config['incoming_path']}"
-
Environment Context: - Parameters like
envadjust pipeline behavior based on the target environment (e.g., development vs. production). - Example: In production (prod), stricter quality checks may be enforced, while in development (dev), some validations may be skipped. -
Schema and Table Specifications: - Configurations define what tables need to be created dynamically and their respective schemas. - Example: The
catalogandschema_baseorganize tables logically in Databricks’ Unity Catalog.
List of Inputs#
Here are the primary inputs used in the pipeline:
1. Pipeline Mode: Batch or Streaming operation.
2. Environment Settings: Information about the runtime environment (e.g., dev, test, or prod).
3. Data Sources and Paths: Incoming file paths, landing zones, and checkpoint directories.
4. Configuration Settings: Flags for validations, catalogs, schemas, and dynamic table generation.
Input Details#
| Input Name | Description |
|---|---|
| mode | Specifies the pipeline Ingestion technique type:Autoloader,Pyspark,DLT streaming. |
| env | Determines the runtime environment, such as dev, test, or prod. |
| catalog | Specifies the Databricks catalog in which the tables will be created. |
| schema_base | The name of the schema within the catalog to organize tables. |
| json_types | List of raw tables (in JSON schema) to be created dynamically for ingestion. |
| landing_volume_path | Path to the raw data landing zone where flat files are ingested. |
| data_provider_path | Subdirectory under the landing zone for handling provider-specific data. |
| incoming_path | Defines the folder that holds incoming files awaiting processing. |
| failed_path | Specifies the directory where files failing validation are stored. |
| checkpoint | Path for maintaining streaming checkpoints to ensure fault tolerance. |
| quality_checks | Flags for quality validations like schema validation and PK constraints. |
Outputs#
The outputs are the final results produced by the pipeline. These outputs are critical for downstream workflows such as analytics, reporting, and auditing.
List of Outputs#
The pipeline generates the following outputs:
1. Delta Tables: Stores processed data for downstream processing.
2. Validation Logs: Tracks schema compliance and data quality checks.
3. Successfully Processed Files: A list of ingested files.
4. Checkpoints: Ensures fault tolerance in streaming pipelines.
5. Bad Records Directory: Holds files that failed validation for auditing.
Output Details#
| Output Name | Description |
|---|---|
| Delta Table | A Delta table populated with validated and enriched records. |
| Validation Logs | Captures validation results, including schema compliance and data quality statuses. |
| Successfully Loaded Data | Stores paths of validated and processed files post-ingestion. |
| Checkpoints Path | A directory for maintaining checkpoints to track ingestion progress, especially in streaming pipelines. |
| Bad Records Directory | Location for files that didn’t pass ingestion validations, for further inspection or audit. |
How Configuration Influences Outputs#
Like inputs, outputs are also defined and influenced dynamically through configuration files. Examples include:
- Delta table definitions (catalog, schema_base) map directly into a structured data lake.
- Validation logs and bad records location are assigned paths using configuration (failed_path, validation_logs_path).
- Streaming checkpoints ensure incremental reads are fault-tolerant.
Example Configuration for Outputs: ℹ️#
{
"databricks": {
"catalog": "prod_catalog",
"schema_base": "bronze_prod",
"delta_tables": [
{
"name": "transactions",
"partition_by": "date",
"storage_path": "transactions/"
}
],
"failed_path": "failed_files/",
"checkpoints_path": "checkpoints/"
}
}