Skip to content

💡 Hover Feature

Look for ℹ️ icons in titles - hover over them to reveal additional information.

Go to Playbook Main Page
Next: File Ingestion Using Autoloader(Python) Tutorial
Back: Back to previous page

Landing to bronze#

The landing_to_bronze script enables efficient, scalable, and fault-tolerant ingestion of raw files from the landing zone into the bronze layer of a data lake. The bronze layer serves as the foundational repository for structured raw data, ensuring fidelity while preparing data for downstream transformations (silver and gold layers).

This document breaks down the script's components, functionality, and required dependencies, providing a clear framework for understanding and extending the script.


Assumptions#

  1. The Databricks infrastructure is pre-configured, including:

    • Access to schemas, directories, paths, and permissions for the bronze layer.
    • Properly initialized Spark sessions
    • Logging utilities and runtime arguments configured.
  2. Environment-specific configurations (dev, staging, prod) are stored externally and dynamically fetched by the script.

  3. This pipeline processes raw files and may require predefined schemas in the bronze layer.

Note:
If these prerequisites are not met, refer to the reference section for detailed instructions on setting up the infrastructure, schemas, and common utilities on Modular Components


Key Features#

Feature Description
Environment-Driven Execution Fetches configurations (e.g., paths, tables) dynamically for each environment (dev, prod).
Databricks Autoloader Utilizes Databricks Autoloader for schema inference, efficient ingestion, and incremental loads.
Parallel/Sequential Modes Supports both parallel ingestion (via multithreading) or sequential ingestion modes.
Dynamic Infrastructure Setup Automatically creates schemas, directories, and checkpoint locations for the bronze layer.
Logging and Traceability Logs metadata like job_id, run_id, table names, and failures for easy debugging and monitoring.

1.landing_to_bronze.py#

Handles the ingestion of raw data from the landing zone into the Bronze layer, enriching it with audit fields and writing it to Delta tables.ℹ️
"""Entrypoint module"""
from conf.constants import Layers
from dhopoc1 import databricks_utils, databricks_utils_bronze, databricks_infrastructure
from conf import databricks_config
import concurrent.futures


def landing_to_bronze_dev():
    """DEV landing to bronze"""
    landing_to_bronze("dev")


def landing_to_bronze(env: str) -> None:
    """
    Loads raw json files from landing zone into bronze
    :param env: Environment type
    :return: None
    """

    svcs = databricks_utils.get_svcs(env=env)
    conf = databricks_config.get_databricks_configuration(env=env, mode="autoloader", layer=Layers.Bronze.value)

    # prepare databricks infra
    databricks_infrastructure.create_schema(
        svcs=svcs,
        conf=conf,
        create_dirs=True
    )

    args = svcs['args']
    svcs["logger"].info(f"Databricks execution: JOBID:{args['job_id']} RUN_JOB_ID:{args['run_id']}")

    table_names = conf["databricks"]["json_types"]
    parallel_processing = bool(conf["databricks"].get("parallel_processing", False))

    if parallel_processing:
        # Using ThreadPoolExecutor to run tasks in parallel
        parallel_workers = int(conf["databricks"].get("max_parallel_processes", 10))

        with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_workers) as executor:
            futures = {executor.submit(
                databricks_utils_bronze.autoloader_data, svcs, conf, table_name, args
            ): table_name for table_name in table_names}

            # Collect and process
            for future in concurrent.futures.as_completed(futures):
                table_name = futures[future]
                try:
                    svcs['logger'].info(f"Data loading completed for {table_name}")
                except Exception as e:
                    svcs['logger'].error(f"Data loading failed for {table_name}: {e}")

    else:
        # regular execution: item in loop, one by one
        for table_name in conf["databricks"]["json_types"]:
            svcs.get("logger").info(f"Loading data into bronze for table {table_name}")
            databricks_utils_bronze.autoloader_data(svcs=svcs, conf=conf, table_name=table_name, args=args)

2.quality_bronze.py#

Validates the data in the Bronze layer by performing schema checks, key validations, and harmonization to ensure data quality before further processing.ℹ️
"""Bronze layer quality checks"""
from conf.constants import Layers
from dhopoc1 import databricks_utils, databricks_utils_bronze
from conf import databricks_config
import concurrent.futures


def quality_bronze_dev():
    """DEV landing to bronze"""
    quality_bronze("dev")


def quality_bronze(env: str) -> None:
    """
    Loads raw json files from landing zone into bronze
    :param env: Environment type
    :return: None
    """

    svcs = databricks_utils.get_svcs(env=env)
    conf = databricks_config.get_databricks_configuration(env=env, mode="autoloader", layer=Layers.Bronze.value)

    # Using ThreadPoolExecutor to run tasks in parallel
    enable_checks = conf["databricks"]["quality_checks"]
    parallel_processing = bool(conf["databricks"].get("parallel_processing", False))

    if parallel_processing:
        # Using ThreadPoolExecutor to run tasks in parallel
        parallel_workers = int(conf["databricks"].get("max_parallel_processes", 10))

        with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_workers) as executor:
            futures = {executor.submit(
                databricks_utils_bronze.quality_checks, svcs, conf, table_name, enable_checks
            ):
                table_name for table_name in conf["databricks"]["json_types"]}

            # Collect and process
            for future in concurrent.futures.as_completed(futures):
                table_name = futures[future]
                try:
                    svcs['logger'].info(f"Quality checks completed for {table_name}")
                except Exception as e:
                    svcs['logger'].error(f"Quality checks failed for {table_name}: {e}")

    else:
        # regular execution: item in loop, one by one
        for table_name in conf["databricks"]["json_types"]:
            databricks_utils_bronze.quality_checks(
                svcs=svcs,
                conf=conf,
                table_name=table_name,
                enable_checks=enable_checks
            )

databricks_utils_bronze.py#

This is the utility file used in the `landing_to_bronze.py` and `bronze_quality.py` jobs, it provides essential functionalities for the Bronze layer, including data ingestion, metadata enrichment, Slowly Changing Dimensions (SCD) updates, and data validation.ℹ️
from typing import Any
from conf.quality_checks_config import QUALITY_CHECKS
from pyspark.sql.types import StringType, StructType, TimestampType
from pyspark.sql.functions import (current_timestamp, col, lit, md5, concat_ws, lag, row_number, min, sum,
                                   when, udf, coalesce)
from pyspark.sql import Window
from pyspark.sql import DataFrame, functions as F
import uuid
from dhopoc1 import databricks_quality_framework as dqf
from conf.schemas import JSON_SCHEMAS, JSON_SCHEMAS_SYS, METADATA_SCHEMA
from conf.constants import Constants


def generate_uuid():
    return str(uuid.uuid4())


def update_sys_modified_at_on_hash_change(svcs: dict[Any, Any], table_name: str, pk_columns: list[str],
                                          mapped: str, run_id: str) -> None:
    """
    Updates sys_modified_at to sys_received_at for records where sys_hash_key changed
    (for given PK columns) in a Databricks Delta table.

    Parameters:
    :param run_id: run job id to identify latest added files within single batch, to update sys_received_at
    :param mapped: mapped column for the table (used to order)
    :param svcs: Databricks services
    :param table_name: Fully qualified table name (e.g., 'db.schema.table')
    :param pk_columns: List of primary key column names (e.g., ['pk1', 'pk2'])
    :return: None
    """

    # Load your Delta table as a DataFrame
    df = svcs.get('spark').table(table_name)

    # Partition by PK, order by sys_received_at
    w = Window.partitionBy(*pk_columns).orderBy("sys_received_at")
    # Find where hash changes (1 if change, else 0)
    hash_change = (col("sys_hash_key") != lag("sys_hash_key", 1).over(w)).cast("int")
    # For first row, treat as version start
    is_first_row = row_number().over(w) == 1
    # Build version number by cumulative sum of changes (and first row)
    version_num = sum(
        when(is_first_row | (hash_change == 1), 1).otherwise(0)
    ).over(w.rowsBetween(Window.unboundedPreceding, 0))
    # Attach version number
    df = df.withColumn("version_num", version_num)
    # Find the min sys_received_at for each version group
    version_window = Window.partitionBy(*(pk_columns + ["version_num"]))
    df = df.withColumn("new_sys_modified_at",
                       when(
                           col("version_num") == 1,
                           col(mapped).cast(TimestampType())
                       ).otherwise(
                           min("sys_received_at").over(version_window)
                       )
                       )

    # Only update if sys_modified_at differs from the new value
    updates = df.filter(coalesce(col("sys_modified_at"), lit('1970-01-01 00:00:00')) != col("new_sys_modified_at")) \
        .select(*pk_columns, "sys_received_at", "new_sys_modified_at") \
        .dropDuplicates(pk_columns + ["sys_received_at"])

    temp_view_name = "tmpview_"+table_name.replace("`","").split(".")[-1]
    svcs['logger'].info(f'Createing tmp view {temp_view_name} for {table_name}')
    updates.createOrReplaceTempView(temp_view_name)
    join_cond = " AND ".join(
        [f"target.{c} = updates.{c}" for c in pk_columns] + ["target.sys_received_at = updates.sys_received_at"])

    sql = f"""
        MERGE INTO {table_name} AS target
        USING {temp_view_name} AS updates
        ON {join_cond}
        WHEN MATCHED THEN
          UPDATE SET target.sys_modified_at = cast(updates.new_sys_modified_at as timestamp)
        """
    svcs.get("logger").info(sql)
    svcs.get('spark').sql(sql)

    # # update sys_received_at after sys_modified_at is applied
    # sql_sys_received_at = f"""
    #     update {table_name}
    #     set
    #         sys_received_at = current_timestamp()
    #         where sys_job_id = '{run_id}'
    # """
    # svcs.get("logger").info(sql_sys_received_at)
    # svcs.get('spark').sql(sql_sys_received_at)


def add_sys_received_at_from_filename(
        df: DataFrame,
        file_name_col: str,
        table_name: str = "_metadata.file_name",
        output_col: str = "sys_received_at"
) -> DataFrame:
    """Adds a timestamp column extracted from a file name column using a standard pattern.
    If the extraction fails, it uses the file modification time as a backup.

    :param df: Input DataFrame
    :param file_name_col: Name of the column containing the file name (e.g., '_metadata.file_name')
    :param table_name: Table name prefix in the file name
    :param output_col: Name of the output timestamp column (default: 'sys_received_at')
    :return: DataFrame with the new timestamp column
    """
    pattern = f"{table_name}_(\\d{{4}}-\\d{{2}}-\\d{{2}}_\\d{{2}}_\\d{{2}}_\\d{{2}})"
    extracted = F.regexp_extract(F.col(file_name_col), pattern, 1)
    received_at_ts = F.to_timestamp(extracted, "yyyy-MM-dd_HH_mm_ss")
    backup_received_at = F.col("_metadata").file_modification_time
    # Use backup if extraction failed (null or empty string)
    sys_received_at = F.when(
        (extracted.isNull()) | (extracted == ""), backup_received_at
    ).otherwise(received_at_ts)
    return df.withColumn(output_col, sys_received_at)


def autoloader_data(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str, args: dict[str, str]) -> None:
    """
    Loads input json files into bronze table
    :param args: asset bundle job parameters dict
    :param svcs: Databricks services
    :param conf: Databricks configuration
    :param table_name: name of table to be created
    :return:None
    """
    # configs
    layer = str(conf["databricks"]["layer"])
    catalog = conf["databricks"]["catalog"]
    schema = f"{conf['databricks']['schema_base']}"
    incoming_path = conf["databricks"]["incoming_path"]
    failed_path = conf["databricks"]["failed_path"]
    landing_volume = conf["databricks"]["landing_volume"]
    checkpoint = conf["databricks"]["checkpoint"]
    map_created_at = conf["databricks"]["map_created_at"]
    target_table = f"`{catalog}`.`{schema}`.`{table_name}`"
    run_id = args["run_id"]
    svcs.get("logger").info(f"Execution details {args}")

    # paths
    source_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{incoming_path}/{table_name}/"
    checkpoint_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{checkpoint}/{table_name}/"
    bad_files_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{failed_path}/{table_name}/"
    svcs.get("logger").info(f"Loading data from {source_path}")

    # schemas
    table_json_schema = JSON_SCHEMAS.get(layer.upper(), {}).get(table_name)
    metadata_autoloader_schema = METADATA_SCHEMA

    # Register the UDF
    uuid_udf = udf(generate_uuid, StringType())

    # read source data
    df = (
        svcs.get("spark")
        .readStream.format("cloudFiles")
        .option("multiline", True)
        .option("cloudFiles.format", "json")
        .schema(table_json_schema)
        .option("cloudFiles.schemaLocation", checkpoint_path)
        .option("badRecordsPath", bad_files_path)
        .load(source_path)
        .select("*", "_metadata")
    )

    # basic checks ADD MORE if needed
    df = df.filter(col("_metadata").isNotNull())
    df = df.withColumn("clean_metadata", col("_metadata").cast(metadata_autoloader_schema))
    df = df.drop("_metadata").withColumnRenamed("clean_metadata", "_metadata")

    # grab business columns
    bc_columns = df.drop("_metadata").columns

    # Expanding the DataFrame with new columns initialized to None
    for field in JSON_SCHEMAS_SYS.get('sys').fields:
        df = df.withColumn(field.name, lit(None).cast(field.dataType))
    # sys_received_at is extracted from file name and casted to timestamp
    df = add_sys_received_at_from_filename(df=df,
                                           file_name_col="_metadata.file_name",
                                           table_name=table_name)

    df = df.withColumn("sys_is_deleted", lit(False))
    df = df.withColumn("sys_job_id", lit(run_id))
    df = df.withColumn("sys_source_filename", df["_metadata"].file_name)
    df = df.withColumn("sys_source_name", lit('kakao'))

    # mapping for sys_modified_at, timestamp modification at source
    col_map_value = map_created_at.get(table_name)
    if not col_map_value:
        raise Exception(f"Table {table_name} not created")
    df = df.withColumn("sys_modified_at", col(col_map_value).cast(TimestampType()))
    df = df.withColumn("concat_tmp", concat_ws("|", *[col(c) for c in bc_columns]))
    df = df.withColumn("sys_hash_key", md5(col("concat_tmp"))).drop("concat_tmp")
    df = df.withColumn("sys_surrogate_pk", uuid_udf())

    # write stream into table
    query = (
        df.writeStream.format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_path)
        .trigger(once=True)  # batch
        .toTable(target_table)
    )

    # wait till micro batch stored
    query.awaitTermination()

    # update modified_at
    try:
        # get processed files in this autoloader batch
        svcs.get("logger").info(f'start doing update sys_modified_at for {run_id}')

        pks = QUALITY_CHECKS.get(Constants.PKS, {})
        pk_cols = pks.get(table_name)
        part_cols = [col_name for col_name in pk_cols if col_name != "sys_received_at"]
        update_sys_modified_at_on_hash_change(svcs=svcs,
                                              table_name=target_table,
                                              pk_columns=part_cols,
                                              mapped=col_map_value,
                                              run_id=run_id)
    except Exception as e:
        svcs.get("logger").error(f"SCD NOT applied with ERROR on {target_table}: {e}")

    svcs.get("logger").info(f"SCD applied on {target_table}")


def quality_checks(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str,
                   enable_checks: dict[str, str] = None) -> None:
    """
    Loads input json files into bronze table
    :param table_name: Name of table to be checked
    :param enable_checks: Dictionary with information about enabled checks
    :param svcs: Databricks services
    :param conf: Databricks configuration
    :table_name: name of table to be checked against quality framework
    :return:None
    """

    # prepare config for table
    layer = conf["databricks"]["layer"]
    catalog = conf["databricks"]["catalog"]
    schema = f"{conf['databricks']['schema_base']}"
    bronze_table = f"`{catalog}`.`{schema}`.`{table_name}`"

    df = svcs['spark'].read.format("delta").table(bronze_table)
    records_for_quality_checks = df.count()
    svcs.get("logger").info(f"Quality checks on bronze table {table_name} will be executed for: "
                            f"{records_for_quality_checks}")

    pks = QUALITY_CHECKS.get(Constants.PKS, {})

    # 0. Clear status of records
    if enable_checks.get(Constants.CLEAR_STATUS, False):
        dqf.clear_quality_flags(svcs=svcs, bronze_table=bronze_table)
        svcs.get("logger").info(f"All records quality flags cleared for table: {bronze_table}")
    else:
        dqf.set_quality_flags(svcs=svcs, bronze_table=bronze_table)
        svcs.get("logger").info(f"New records quality flags cleared for table: {bronze_table}")

    # 1. Schema check
    if enable_checks.get(Constants.SCHEMA, False):

        # 1. check schema
        combined_schema = StructType(
            JSON_SCHEMAS[layer.upper()].get(table_name).fields
            + JSON_SCHEMAS_SYS.get('sys').fields
            + JSON_SCHEMAS_SYS.get('autoloader').fields
        )
        if not dqf.check_schema(svcs=svcs, df_or_table=df, expected_schema=combined_schema, table=bronze_table):
            svcs.get("logger").info(f"Schema mismatch detected, skipping table: {bronze_table}")
            return
        else:
            svcs.get("logger").info(f"Schema for table: {bronze_table} aligned with expected")

    # 2.PKs check
    if enable_checks.get(Constants.PKS, False):
        dqf.check_primary_key_nulls(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
        svcs.get("logger").info(f"PKs null for table: {bronze_table} checked")
        dqf.check_primary_key_duplicates(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
        svcs.get("logger").info(f"PKs duplicates for table: {bronze_table} checked")

    # 3. lookup/reference checks
    if enable_checks.get(Constants.REFS, False):

        # check last modification
        table_names = [
            v["table"]
            for v in (
                entry for subdict in QUALITY_CHECKS.get(Constants.REFS, {}).values() for entry in subdict.values()
            )
        ]
        table_modification_time_dict = dqf.get_ref_latest_timestamp(
            svcs=svcs,
            catalog=catalog,
            table_names=table_names
        )

        refs = QUALITY_CHECKS.get(Constants.REFS, {})
        for ref_key, ref_value in refs.get(table_name, {}).items():
            svcs.get("logger").info(f"Validating ref for table: {bronze_table} ref: {ref_key}")
            dqf.check_lookup_values(
                svcs=svcs,
                bronze_table=bronze_table,
                bronze_col_name=ref_key,
                lookup_table=f"{catalog}.{ref_value.get('table')}",
                last_ref_modified=table_modification_time_dict[
                    f"{ref_value.get('table')}"
                ],
                lookup_col_name=ref_value.get('field')
            )
            svcs.get("logger").info(f"REF for table: {bronze_table} and {ref_key} checked")

    # 4. FK checks
    if enable_checks.get(Constants.FKS, False):
        fks = QUALITY_CHECKS.get(Constants.FKS, {})
        for ref_key, ref_value in fks.get(table_name, {}).items():
            svcs.get("logger").info(f"Validating FK for table: {bronze_table} ref: {ref_key}")
            dqf.check_foreign_key(
                svcs=svcs,
                bronze_table=bronze_table,
                bronze_col_fk=ref_key,
                remote_table=f"{catalog}.{schema}.{ref_value.get('table')}",
                remote_col_pk=ref_value.get('field')
            )
            svcs.get("logger").info(f"FK for table: {bronze_table} and {ref_key} checked")

    # 4. Harmonization
    if enable_checks.get(Constants.HARMONIZATION, False):
        harm = QUALITY_CHECKS.get(Constants.HARMONIZATION, {})
        for harm_key, harm_value in harm.get(table_name, {}).items():
            svcs.get("logger").info(f"Validating HARMONIZATION for table: {bronze_table} harmonization: {harm_key}")
            dqf.check_harmonization_values(
                svcs=svcs,
                bronze_table=bronze_table,
                bronze_col_name=harm_key,
                lookup_table=f"{catalog}.{harm_value.get('table')}",
                lookup_col_name=harm_value.get('field')
            )
            svcs.get("logger").info(f"HARMONIZATION for table: {bronze_table} and {harm_key} checked")

    # 5. Check length
    if enable_checks.get(Constants.VALUE_MAX_LENGTH, False):
        max_lengths = QUALITY_CHECKS.get(Constants.VALUE_MAX_LENGTH, {})
        for limit_key, limit_value in max_lengths.get(table_name, {}).items():
            dqf.check_max_length_field(
                svcs=svcs,
                bronze_table=bronze_table,
                column=limit_key,
                limit=limit_value
            )
            svcs.get("logger").info(f"Value length for table: {bronze_table} and {limit_key} "
                                    f"with limit {limit_value} checked")

    # 6. Activate SCD
    if enable_checks.get(Constants.SCD, False):
        dqf.calculate_active(svcs=svcs, bronze_table=bronze_table, pk_cols=pks.get(table_name))
        svcs.get("logger").info(f"SCD for table: {bronze_table} applied ")

Code Walkthrough#

Importing Required Modules ℹ️#

[From conf.constants]

• Defines the layers of processing (Bronze, Silver, Gold) in the medallion architecture.

• Declared in the shared 'constants' file.

• databricks_utils:
    • Contains core utilities like 'get_svcs', which initialize Databricks services for processing.

• databricks_utils_bronze:
    • Focused on Bronze-layer-specific tasks, such as autoloader ingestion.

• databricks_infrastructure:
    • Supports schema creation and directory preparation tasks.

• databricks_config:
    • Handles environment-specific configuration retrieval.

• concurrent.futures:
    • Standard Python library used to enable parallel execution for processing tables concurrently.

Note:
For detailed information about:
- Configuration
- Utilities
- Common Functions
- Infrastructure Functions

Please refer to the Modular Components document located in the Reference section.
from conf.constants import Layers
from dhopoc1 import databricks_utils, databricks_utils_bronze, databricks_infrastructure
from conf import databricks_config
import concurrent.futures

Entrypoint for Dev Environment ℹ️#

Acts as a shortcut to process data for the "dev" environment by calling the main landing_to_bronze function.
def landing_to_bronze_dev():
    """DEV landing to bronze"""
    landing_to_bronze("dev")

Fetching Databricks Services and Configuration ℹ️#

[From databricks_utils]
• Initializes Databricks services:
    • spark session: Enables Spark SQL/Delta capabilities.
    • dbutils: Provides filesystem and utility functions.
    • logger: Outputs logs related to execution.
    • args: Fetches execution arguments (job_id, run_id).

[From databricks_config]
• Fetches environment-specific configuration for the Bronze layer.

Key Configuration Values:
• json_types: List of tables to process (e.g., ConsentStatus, Medication, etc.).
• parallel_processing: Boolean flag to enable concurrent processing.
• Directory paths: Includes incoming_path, failed_path, and checkpoint.
svcs = databricks_utils.get_svcs(env=env)
conf = databricks_config.get_databricks_configuration(env=env, mode="autoloader", layer=Layers.Bronze.value)

Infrastructure Setup ℹ️#

[From databricks_infrastructure]

• Handles schema and directory setup. Get's the Schema and Configuration details from the schema and databricks configuration file

• Key Operations:
    • Creates schema if not present (CREATE SCHEMA IF NOT EXISTS).
    • Creates directories (e.g., incoming, failed, checkpoint).
    • Clears existing environment if enabled (conf["databricks"]["recreate_env"]).

Note:
For detailed information about:
- Configuration
- Utilities
- Common Functions
- Infrastructure Functions

Please refer to the Modular Components document located in the Reference section.
databricks_infrastructure.create_schema(
    svcs=svcs,
    conf=conf,
    create_dirs=True
)

Logging Execution Details ℹ️#

Getting args:
• Fetched via databricks_utils.get_svcs():
    • job_id: Identifier for the Databricks pipeline.
    • run_id: Unique ID for the specific run.

Logging:
• Logs execution details using the logger from svcs.


Note:
For detailed information about:
- Configuration
- Utilities
- Common Functions
- Infrastructure Functions

Please refer to the Modular Components document located in the Reference section.
args = svcs['args']
svcs["logger"].info(f"Databricks execution: JOBID:{args['job_id']} RUN_JOB_ID:{args['run_id']}")

Processing#

parallel Processing ℹ️#

Parallel Execution (enabled by conf["databricks"]["parallel_processing"]):
• Processes the files from Incoming directory concurrently using ThreadPoolExecutor and writes the data incrementally to Delta tables in the Bronze layer.

autoloader_data [From databricks_utils_bronze]:
• Reads data from the incoming_path and writes data to Delta table
• Uses Audit fields such as `sys_surrogate_pk` and `sys_hash_key` for deduplication, lineage tracking, and version control. 
• Since AutoLoader inherently tracks processed files using a checkpoint directory, it skips already ingested files automatically. This ensures that:
- No duplicate files are reprocessed.
- All processed files remain in the `incoming_path`, and there's no need for additional file movement to a `loaded_path`,
- Bad Files are move to Bad File Directory
Note: For detailed explantion of autoloader_data function go through the Databricks Bronze Utility section
if parallel_processing:
    parallel_workers = int(conf["databricks"].get("max_parallel_processes", 10))
    with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_workers) as executor:
        futures = {executor.submit(
            databricks_utils_bronze.autoloader_data, svcs, conf, table_name, args
        ): table_name for table_name in table_names}

Sequential Processing ℹ️#

• Processes each table one by one in a sequential manner, calling autoloader_data for each table.
else:
    for table_name in conf["databricks"]["json_types"]:
        databricks_utils_bronze.autoloader_data(svcs=svcs, conf=conf, table_name=table_name, args=args)

Bronze Layer Quality Checks#

The quality_bronze module serves as the final validation step for the Bronze layer pipeline. After loading raw JSON files into Delta tables via the landing_to_bronze process, this module validates that the ingested data meets all required quality standards, such as schema compliance, primary/foreign key validation, and harmonization checks.


Note on Implementation#

The process and structure of quality_bronze are almost identical to landing_to_bronze, except for the core function called for each table.

  • In landing_to_bronze, the autoloader_data function is used to ingest data into the Bronze layer.
  • In quality_bronze, the quality_checks function is used to validate the ingested data.

Core Difference#

For each table in the configuration, the following function is invoked in quality_bronze to perform quality checks:

databricks_utils_bronze.quality_checks(
    svcs=svcs,                # Databricks services (e.g., Spark session, logger)
    conf=conf,                # Environment-specific configuration
    table_name=table_name,    # Name of the current table being validated
    enable_checks=enable_checks  # Dictionary of quality checks to apply (e.g., schema, PK, FK, harmonization)
)

Databricks Bronze Utility#

The Databricks Bronze Utility module (databricks_utils_bronze) is designed to manage the ingestion of raw data into the Bronze layer of the medallion architecture. This module ensures the data is properly structured, enriched with audit data, and stored in Delta tables for further processing in the Silver and Gold layers.


Primary Responsibilities#

  • Ingest raw data from a landing zone using Databricks Autoloader.
  • Validate and enhance data by applying schema validation and adding audit fields.
  • Ensure data consistency with Slowly Changing Dimensions (SCD) updates.
  • Enable scalability through streaming and batch processing.
  • Prepare data for downstream layers by ensuring clean, governed Bronze-level data.

This utility acts as the entry point for onboarding raw data into Delta Lake, enforcing data quality checks and providing robust handling of metadata during the ingestion process.


List of Available Functions#

Here is the list of functions provided in the module along with their purpose:

Function Name Description
generate_uuid Generates a unique UUID. Used for creating surrogate primary keys (sys_surrogate_pk) during ingestion.
update_sys_modified_at_on_hash_change Updates the sys_modified_at column for records where the hash key changes (supports tracking SCD data).
add_sys_received_at_from_filename Extracts a timestamp from filenames, falling back to file modification time if unavailable.
autoloader_data Ingests JSON files into Bronze Delta tables. Enhances data with metadata columns and writes to streaming/Delta tables.
quality_checks Performs data quality checks,such as schema validation, primary/foreign key checks, and harmonization logic.

autoloader_data() ℹ️#

def autoloader_data(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str, args: dict[str, str]) -> None:
    """
    Reads input JSON files from a source path and loads them into a Delta Bronze table,
    enriching the data with additional metadata fields. Uses Databricks Auto Loader for streaming.
    :param svcs: Databricks services (e.g., SparkSession, Logger, etc.).
    :param conf: Configuration dictionary containing paths, catalog, schema, etc.
    :param table_name: Name of the Delta Bronze table where the processed data is written.
    :param args: Job-specific parameters, such as `run_id`.
    """
    # Step 1: Extract catalog, schema, and other configurations
    catalog = conf["databricks"]["catalog"]
    schema = conf["databricks"]["schema_base"]
    incoming_path = conf["databricks"]["incoming_path"]
    failed_path = conf["databricks"]["failed_path"]
    landing_volume = conf["databricks"]["landing_volume"]
    checkpoint = conf["databricks"]["checkpoint"]
    target_table = f"`{catalog}`.`{schema}`.`{table_name}`"
    run_id = args["run_id"] 

    # Paths for data ingestion, checkpoints, and bad records
    source_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{incoming_path}/{table_name}/"
    checkpoint_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{checkpoint}/{table_name}/"
    bad_files_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{failed_path}/{table_name}/"

    # Log execution details for troubleshooting and monitoring
    svcs.get("logger").info(f"Execution details: {args}")
    svcs.get("logger").info(f"Source path: {source_path}")

    # Step 2: Extract the table schema and metadata schema
    table_json_schema = JSON_SCHEMAS.get(conf["databricks"]["layer"].upper(), {}).get(table_name)
    metadata_schema = METADATA_SCHEMA

    # Step 3: Read source data with Auto Loader and validate against schema
    # Databricks Auto Loader handles incremental data ingestion.
    # Invalid records (malformed JSON or incompatible schema) are redirected to `badRecordsPath`.
    df = (
        svcs["spark"]
        .readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")  # Expecting JSON file format
        .schema(table_json_schema)  # Apply predefined schema for input data
        .option("cloudFiles.schemaLocation", checkpoint_path)  # Store schema inference
        .option("badRecordsPath", bad_files_path)  # Redirect invalid JSON files
        .load(source_path)  # Load records from the specified source path
        .select("*", "_metadata")  # Include Auto Loader metadata for debugging and processing
    )

    # Step 4: Filter invalid data and enrich with metadata fields
    # Filtering removes records missing _metadata (e.g., incomplete or malformed JSON).
    df = df.filter(col("_metadata").isNotNull())

    # Enrich data with cleaned metadata. This ensures `_metadata` is compatible with its schema.
    df = df.withColumn("clean_metadata", col("_metadata").cast(metadata_schema))
    df = df.drop("_metadata").withColumnRenamed("clean_metadata", "_metadata")

    # Add system-generated metadata fields for lineage and traceability
    # `sys_received_at`: Timestamp for when the pipeline ingests the record
    df = df.withColumn("sys_received_at", current_timestamp())

    # `sys_source_filename`: Name of the file the record originates from
    df = df.withColumn("sys_source_filename", df["_metadata"].file_name)

    # `sys_source_name`: Name of the source system (e.g., "kakao" in this case)
    df = df.withColumn("sys_source_name", lit("kakao"))  # Example value for system source

    # System flags
    # `sys_is_deleted`: False for all incoming records in the Bronze layer
    df = df.withColumn("sys_is_deleted", lit(False))

    # `sys_job_id`: Identifier assigned to the specific pipeline job execution
    df = df.withColumn("sys_job_id", lit(run_id))

    # Step 5: Generate hash keys and surrogate keys to uniquely identify records
    # Generate a unique hash key for deduplication and change detection
    # `sys_hash_key` combines all business columns to create a unique fingerprint of each record.
    concat_col = concat_ws("|", *[col(c) for c in df.drop("_metadata").columns])
    df = df.withColumn("sys_hash_key", md5(concat_col))  # MD5 hash for efficient change detection

    # Generate a surrogate primary key
    # A UUID is used as a unique surrogate key (`sys_surrogate_pk`) for each record.
    # This ensures that each record is uniquely identifiable, regardless of source system properties.
    df = df.withColumn("sys_surrogate_pk", udf(generate_uuid, StringType())())

    # Step 6: Write enriched data back into a Delta table
    # The function writes to the target Bronze Delta table using structured streaming.
    query = (
        df.writeStream.format("delta")
        .outputMode("append")  # Incrementally append new records into the Delta table
        .option("checkpointLocation", checkpoint_path)  # Enable fault-tolerant checkpointing for idempotency
        .trigger(once=True)  # Process the data in one micro-batch
        .toTable(target_table)  # Save processed records into the Delta Bronze table
    )

    # Wait until the streaming query completes
    query.awaitTermination()

    # Log success message after successful ingestion and data write
    svcs.get("logger").info(f"Successfully loaded data into: {target_table}")

# Example Usage:
# Initialize services and configuration
services = get_svcs("dev")
conf = {
    "databricks": {
        "catalog": "example_catalog",
        "schema_base": "example_schema",
        "landing_volume": "raw_data",
        "incoming_path": "incoming",
        "failed_path": "bad_records",
        "checkpoint": "checkpoints",
    }
}
args = {"run_id": "job12345"}
# Call the autoloader function
autoloader_data(services, conf, "example_table", args)
# Outputs:
# 1. Data from JSON files is read and written to the "example_table".
# 2. Invalid files are redirected to `failed_path` for troubleshooting.
# 3. Metadata fields like `sys_received_at`, `sys_surrogate_pk`, and `sys_hash_key` are added.
What It Does Input Output When to Use
Ingests JSON files into Delta Bronze tables, ensuring data quality through filtering and metadata enrichment. Applies surrogate key generation for traceability and schema validation for consistency. Tracks ingestion progress using checkpoint files for fault tolerance. svcs (dict): Core services like Logger, SparkSession, and DBUtils.
conf (dict): Configuration for paths, schema details, and metadata.
table_name (str): Target Delta table.
args (dict): Job-specific parameters, such as run_id.
None.
Processed data includes metadata fields (sys_received_at, sys_job_id, sys_hash_key, sys_source_filename, and sys_surrogate_pk) and is written to Delta Bronze tables.
Malformed records are redirected to the badRecordsPath.
Checkpoint files ensure fault tolerance for batch or streaming ingestion.
Use for fault-tolerant real-time or batch ingestion pipelines.
Ideal for processing raw data into Bronze Delta tables that require unique identifiers, metadata enrichment, and incremental ingestion.

generate_uuid() ℹ️#

def generate_uuid() -> str:
    """
    Generates a universally unique identifier (UUID).
    """
    return str(uuid.uuid4())

# Example Usage:
# Generate a UUID and assign it to a record
record_id = generate_uuid()
# Output: A unique string such as "3fa85f64-5717-4562-b3fc-2c963f66afa6"
What It Does Input Output When to Use
Generates a new Universally Unique Identifier (UUID), often used as a surrogate key for Delta table records. None A unique string identifier. Use for creating unique records, enabling deduplication or version tracking in Bronze tables.
---

add_sys_received_at_from_filename() ℹ️#

def add_sys_received_at_from_filename(
        df: DataFrame,
        file_name_col: str,
        table_name: str = "_metadata.file_name",
        output_col: str = "sys_received_at"
) -> DataFrame:
    """
    Adds a `sys_received_at` timestamp column derived from filenames. Defaults to the file's modification time if parsing fails.
    """
    # Step 1: Define a regex pattern to extract timestamps from filenames
    pattern = f"{table_name}_(\\d{{4}}-\\d{{2}}-\\d{{2}}_\\d{{2}}_\\d{{2}}_\\d{{2}})"
    extracted = F.regexp_extract(F.col(file_name_col), pattern, 1)  # Extract the timestamp based on the regex
    received_at_ts = F.to_timestamp(extracted, "yyyy-MM-dd_HH_mm_ss")  # Convert extracted string to timestamp type
    backup_received_at = F.col("_metadata.file_modification_time")  # Default to the file's modification time

    # Step 2: Assign timestamps to the `sys_received_at` column
    sys_received_at = F.when((extracted.isNull()) | (extracted == ""), backup_received_at).otherwise(received_at_ts)
    return df.withColumn(output_col, sys_received_at)  # Add the new column to the DataFrame

# Example Usage:
# Add sys_received_at column to a DataFrame based on file names
df = spark.read.format("json").load("/path/to/data")
df_with_sys_received_at = add_sys_received_at_from_filename(
    df=df,
    file_name_col="_metadata.file_name",
    table_name="my_table"
)
# New timestamp column ("sys_received_at") will be available in the resulting DataFrame.
What It Does Input Output When to Use
Extracts timestamps from filenames or _metadata. Defaults to file modification time if no valid timestamp is found. df (DataFrame): Input DataFrame.
file_name_col (str): Column containing file names.
table_name (str): Source table prefix.
output_col (str): Column for extracted timestamp.
Returns the input DataFrame with an added sys_received_at column. Use during data ingestion workflows requiring timestamps derived from filenames or modification times.

update_sys_modified_at_on_hash_change() ℹ️#

def update_sys_modified_at_on_hash_change(
        svcs: dict[Any, Any],
        table_name: str,
        pk_columns: list[str],
        mapped: str,
        run_id: str
) -> None:
    """
    Updates `sys_modified_at` in Delta tables where hash-key changes are detected for primary key columns.
    """
    # Step 1: Load the Delta table into a Spark DataFrame
    df = svcs.get("spark").table(table_name)

    # Step 2: Partition the data by primary keys and order it by `sys_received_at`
    w = Window.partitionBy(*pk_columns).orderBy("sys_received_at")
    # Detect hash changes across rows
    hash_change = (col("sys_hash_key") != lag("sys_hash_key", 1).over(w)).cast("int")

    # Step 3: Create a version number column using cumulative sums of hash changes
    version_num = sum(when(row_number().over(w) == 1 | (hash_change == 1), 1).otherwise(0)).over(w)
    df = df.withColumn("version_num", version_num)

    # Step 4: Assign new `sys_modified_at` timestamps
    version_window = Window.partitionBy(*pk_columns + ["version_num"])
    df = df.withColumn(
        "new_sys_modified_at",
        when(version_num == 1, col(mapped).cast(TimestampType())).otherwise(min("sys_received_at").over(version_window))
    )

    # Step 5: Execute update to apply changes to the Delta table
    sql_merge = f"""
    MERGE INTO {table_name} AS target
    USING (SELECT * FROM {df}) AS updates
    ON target.hash_key = updates.hash_key
    WHEN MATCHED THEN
      UPDATE SET target.sys_modified_at = updates.new_sys_modified_at
    """
    svcs.get("spark").sql(sql_merge)

# Example Usage:
# Update the Delta table with new sys_modified_at values for changed records
update_sys_modified_at_on_hash_change(
    svcs=services,
    table_name="my_table",
    pk_columns=["id"],
    mapped="original_timestamp_column",
    run_id="run_1234"
)
# The function will detect hash changes and update `sys_modified_at` values accordingly.
What It Does Input Output When to Use
Updates the sys_modified_at value for Delta table records if hash key changes are detected, ensuring SCD (Slowly Changing Dimension) handling. svcs (dict): Databricks services (e.g., SparkSession).
table_name (str): Fully qualified Delta table.
pk_columns (list): Primary keys.
mapped (str): Mapped column.
run_id (str): Job execution ID.
Updates the Delta table’s sys_modified_at column based on hash/key changes. Use for Bronze-to-Silver layer transformations or historical data versioning that needs SCD logic.

quality_checks() ℹ️#

def quality_checks(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str, enable_checks: dict[str, str] = None) -> None:
    """
    Performs quality checks on a Bronze Delta table. Updates quality flags for errors and validates schema, PK/FK constraints, and harmonization.

    :param svcs: Databricks core services (e.g., SparkSession, Logger).
    :param conf: Configuration settings for paths, schemas, etc.
    :param table_name: Name of the Delta Bronze table being validated.
    :param enable_checks: A dictionary of flags to toggle specific checks (e.g., PK, FK, schema).
    :return: None
    """
    # Step 1: Prepare configurations
    layer = conf["databricks"]["layer"]  # Current pipeline layer (e.g., Bronze)
    catalog = conf["databricks"]["catalog"]  # Catalog for the table
    schema = f"{conf['databricks']['schema_base']}"  # Schema within the catalog
    bronze_table = f"`{catalog}`.`{schema}`.`{table_name}`"  # Fully qualified table name

    # Load the Delta table into a DataFrame
    df = svcs["spark"].read.format("delta").table(bronze_table)
    records_for_quality_checks = df.count()  # Track number of records checked

    # Log details about the quality check being performed
    svcs.get("logger").info(f"Starting quality checks on Bronze table {table_name} for: {records_for_quality_checks} records.")

    # Fetch the primary key configuration
    pks = QUALITY_CHECKS.get(Constants.PKS, {})

    # -------------------------------------------------------------------------------------
    # 0. Clear quality flags on new or existing records
    # Check whether quality flags should be cleared before starting validations.
    if enable_checks.get(Constants.CLEAR_STATUS, False):
        dqf.clear_quality_flags(svcs=svcs, bronze_table=bronze_table)
        svcs.get("logger").info(f"Cleared all quality flags for table: {bronze_table}.")
    else:
        dqf.set_quality_flags(svcs=svcs, bronze_table=bronze_table)
        svcs.get("logger").info(f"Set initial quality flags for new records in table: {bronze_table}.")

    # -------------------------------------------------------------------------------------
    # 1. Schema Validation
    # Ensure the table schema aligns with the expected schema definition.
    if enable_checks.get(Constants.SCHEMA, False):
        # Combine system fields and table-specific schema fields.
        combined_schema = StructType(
            JSON_SCHEMAS[layer.upper()].get(table_name).fields
            + JSON_SCHEMAS_SYS.get("sys").fields
        )

        # Check if schema matches the expected definition.
        if not dqf.check_schema(svcs=svcs, df_or_table=df, expected_schema=combined_schema, table=bronze_table):
            svcs.get("logger").error(f"Schema mismatch detected for table: {bronze_table}. Skipping further checks.")
            return  # Stop quality checks if the schema validation fails.

        svcs.get("logger").info(f"Schema successfully validated for table: {bronze_table}.")

    # -------------------------------------------------------------------------------------
    # 2. Primary Key (PK) Checks
    # Identify null or duplicate primary key violations in the table.
    if enable_checks.get(Constants.PKS, False):
        # Check for null values in the primary key columns
        dqf.check_primary_key_nulls(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
        svcs.get("logger").info(f"Primary key null check completed for table: {bronze_table}.")

        # Check for duplicate primary key values across records
        dqf.check_primary_key_duplicates(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
        svcs.get("logger").info(f"Primary key duplicate check completed for table: {bronze_table}.")

    # -------------------------------------------------------------------------------------
    # 3. Reference (Lookup) Validation
    # Validate that records in the table reference valid keys from external lookup tables.
    if enable_checks.get(Constants.REFS, False):
        refs = QUALITY_CHECKS.get(Constants.REFS, {})
        for ref_key, ref_value in refs.get(table_name, {}).items():
            # Validate against the lookup table and columns
            dqf.check_lookup_values(
                svcs=svcs,
                bronze_table=bronze_table,
                bronze_col_name=ref_key,
                lookup_table=f"{catalog}.{ref_value.get('table')}",
                lookup_col_name=ref_value.get("field")
            )
            svcs.get("logger").info(f"Reference validation completed for field `{ref_key}` in table: {bronze_table}.")

    # -------------------------------------------------------------------------------------
    # 4. Foreign Key (FK) Checks
    # Ensure foreign key constraints are upheld and reference valid primary keys in external tables.
    if enable_checks.get(Constants.FKS, False):
        fks = QUALITY_CHECKS.get(Constants.FKS, {})
        for fk_key, fk_value in fks.get(table_name, {}).items():
            dqf.check_foreign_key(
                svcs=svcs,
                bronze_table=bronze_table,
                bronze_col_fk=fk_key,
                remote_table=f"{catalog}.{schema}.{fk_value.get('table')}",
                remote_col_pk=fk_value.get("field")
            )
            svcs.get("logger").info(f"Foreign key validation completed for field `{fk_key}` in table: {bronze_table}.")

    # -------------------------------------------------------------------------------------
    # 5. Harmonization Validation
    # Harmonize record values to ensure consistency (e.g., common formats).
    if enable_checks.get(Constants.HARMONIZATION, False):
        harm = QUALITY_CHECKS.get(Constants.HARMONIZATION, {})
        for harm_key, harm_value in harm.get(table_name, {}).items():
            dqf.check_harmonization_values(
                svcs=svcs,
                bronze_table=bronze_table,
                bronze_col_name=harm_key,
                lookup_table=f"{catalog}.{harm_value.get('table')}",
                lookup_col_name=harm_value.get("field")
            )
            svcs.get("logger").info(f"Harmonization validation completed for field `{harm_key}` in table: {bronze_table}.")

    # -------------------------------------------------------------------------------------
    # 6. Check Length Constraints
    # Verify that column values adhere to maximum lengths as defined in the schema.
    if enable_checks.get(Constants.VALUE_MAX_LENGTH, False):
        max_lengths = QUALITY_CHECKS.get(Constants.VALUE_MAX_LENGTH, {})
        for limit_key, limit_value in max_lengths.get(table_name, {}).items():
            dqf.check_max_length_field(
                svcs=svcs,
                bronze_table=bronze_table,
                column=limit_key,
                limit=limit_value,
            )
            svcs.get("logger").info(f"Length validation completed for field `{limit_key}` in table: {bronze_table}.")

services = get_svcs("dev")
conf = {
    "databricks": {
        "layer": "Bronze",
        "catalog": "example_catalog",
        "schema_base": "example_schema"
    }
}
enable_checks = {
    "SCHEMA": True,
    "PKS": True,
    "FKS": True,
    "REFS": True,
    "VALUE_MAX_LENGTH": True,
    "HARMONIZATION": True,
    "CLEAR_STATUS": False,
}
table_name = "example_table"

# Run quality checks
quality_checks(services, conf, table_name, enable_checks)
What It Does Input Output When to Use
Performs multiple quality validation checks on Bronze Delta tables, such as schema matching, primary key (PK), foreign key (FK), lookup validations, and harmonization checks. Tracks and flags errors, while ensuring clean and consistent data within the Bronze layer. svcs (dict): Databricks services like SparkSession and Logger.
conf (dict): Configuration settings for schema, catalog, and paths.
table_name (str): Name of the Delta table to check.
enable_checks (dict): A dictionary of boolean flags to enable or disable specific quality checks.
Updates quality flags (e.g., error identification) directly within the Bronze table. Use for ensuring data quality in Bronze Delta tables, aligning records to schema, PK/FK rules, and reference values. This function is useful before propagating records to Silver or Gold layers.

Click here to access the Modular Components

Click here to access official Databricks Autoloader Documentation

Click here to access the repository to view the example: DHO-PoC1/src/dhopoc1/landing_to_bronze.py

Click here to access the repository to view the example: DHO-PoC1/src/dhopoc1/quality_bronze.py

Click here to access the repository to view the example: DHO-PoC1/src/dhopoc1/databricks_utils_bronze.py