💡 Hover Feature
Look for ℹ️ icons in titles - hover over them to reveal additional information.
Go to Playbook Main Page
Next: File Ingestion Using Autoloader(Python) Tutorial
Back: Back to previous page
Landing to bronze#
The landing_to_bronze script enables efficient, scalable, and fault-tolerant ingestion of raw files from the landing zone into the bronze layer of a data lake. The bronze layer serves as the foundational repository for structured raw data, ensuring fidelity while preparing data for downstream transformations (silver and gold layers).
This document breaks down the script's components, functionality, and required dependencies, providing a clear framework for understanding and extending the script.
Assumptions#
-
The Databricks infrastructure is pre-configured, including:
- Access to schemas, directories, paths, and permissions for the bronze layer.
- Properly initialized Spark sessions
- Logging utilities and runtime arguments configured.
-
Environment-specific configurations (
dev,staging,prod) are stored externally and dynamically fetched by the script. -
This pipeline processes raw files and may require predefined schemas in the bronze layer.
Note:
If these prerequisites are not met, refer to the reference section for detailed instructions on setting up the infrastructure, schemas, and common utilities on Modular Components
Key Features#
| Feature | Description |
|---|---|
| Environment-Driven Execution | Fetches configurations (e.g., paths, tables) dynamically for each environment (dev, prod). |
| Databricks Autoloader | Utilizes Databricks Autoloader for schema inference, efficient ingestion, and incremental loads. |
| Parallel/Sequential Modes | Supports both parallel ingestion (via multithreading) or sequential ingestion modes. |
| Dynamic Infrastructure Setup | Automatically creates schemas, directories, and checkpoint locations for the bronze layer. |
| Logging and Traceability | Logs metadata like job_id, run_id, table names, and failures for easy debugging and monitoring. |
1.landing_to_bronze.py#
"""Entrypoint module"""
from conf.constants import Layers
from dhopoc1 import databricks_utils, databricks_utils_bronze, databricks_infrastructure
from conf import databricks_config
import concurrent.futures
def landing_to_bronze_dev():
"""DEV landing to bronze"""
landing_to_bronze("dev")
def landing_to_bronze(env: str) -> None:
"""
Loads raw json files from landing zone into bronze
:param env: Environment type
:return: None
"""
svcs = databricks_utils.get_svcs(env=env)
conf = databricks_config.get_databricks_configuration(env=env, mode="autoloader", layer=Layers.Bronze.value)
# prepare databricks infra
databricks_infrastructure.create_schema(
svcs=svcs,
conf=conf,
create_dirs=True
)
args = svcs['args']
svcs["logger"].info(f"Databricks execution: JOBID:{args['job_id']} RUN_JOB_ID:{args['run_id']}")
table_names = conf["databricks"]["json_types"]
parallel_processing = bool(conf["databricks"].get("parallel_processing", False))
if parallel_processing:
# Using ThreadPoolExecutor to run tasks in parallel
parallel_workers = int(conf["databricks"].get("max_parallel_processes", 10))
with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_workers) as executor:
futures = {executor.submit(
databricks_utils_bronze.autoloader_data, svcs, conf, table_name, args
): table_name for table_name in table_names}
# Collect and process
for future in concurrent.futures.as_completed(futures):
table_name = futures[future]
try:
svcs['logger'].info(f"Data loading completed for {table_name}")
except Exception as e:
svcs['logger'].error(f"Data loading failed for {table_name}: {e}")
else:
# regular execution: item in loop, one by one
for table_name in conf["databricks"]["json_types"]:
svcs.get("logger").info(f"Loading data into bronze for table {table_name}")
databricks_utils_bronze.autoloader_data(svcs=svcs, conf=conf, table_name=table_name, args=args)
2.quality_bronze.py#
"""Bronze layer quality checks"""
from conf.constants import Layers
from dhopoc1 import databricks_utils, databricks_utils_bronze
from conf import databricks_config
import concurrent.futures
def quality_bronze_dev():
"""DEV landing to bronze"""
quality_bronze("dev")
def quality_bronze(env: str) -> None:
"""
Loads raw json files from landing zone into bronze
:param env: Environment type
:return: None
"""
svcs = databricks_utils.get_svcs(env=env)
conf = databricks_config.get_databricks_configuration(env=env, mode="autoloader", layer=Layers.Bronze.value)
# Using ThreadPoolExecutor to run tasks in parallel
enable_checks = conf["databricks"]["quality_checks"]
parallel_processing = bool(conf["databricks"].get("parallel_processing", False))
if parallel_processing:
# Using ThreadPoolExecutor to run tasks in parallel
parallel_workers = int(conf["databricks"].get("max_parallel_processes", 10))
with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_workers) as executor:
futures = {executor.submit(
databricks_utils_bronze.quality_checks, svcs, conf, table_name, enable_checks
):
table_name for table_name in conf["databricks"]["json_types"]}
# Collect and process
for future in concurrent.futures.as_completed(futures):
table_name = futures[future]
try:
svcs['logger'].info(f"Quality checks completed for {table_name}")
except Exception as e:
svcs['logger'].error(f"Quality checks failed for {table_name}: {e}")
else:
# regular execution: item in loop, one by one
for table_name in conf["databricks"]["json_types"]:
databricks_utils_bronze.quality_checks(
svcs=svcs,
conf=conf,
table_name=table_name,
enable_checks=enable_checks
)
databricks_utils_bronze.py#
from typing import Any
from conf.quality_checks_config import QUALITY_CHECKS
from pyspark.sql.types import StringType, StructType, TimestampType
from pyspark.sql.functions import (current_timestamp, col, lit, md5, concat_ws, lag, row_number, min, sum,
when, udf, coalesce)
from pyspark.sql import Window
from pyspark.sql import DataFrame, functions as F
import uuid
from dhopoc1 import databricks_quality_framework as dqf
from conf.schemas import JSON_SCHEMAS, JSON_SCHEMAS_SYS, METADATA_SCHEMA
from conf.constants import Constants
def generate_uuid():
return str(uuid.uuid4())
def update_sys_modified_at_on_hash_change(svcs: dict[Any, Any], table_name: str, pk_columns: list[str],
mapped: str, run_id: str) -> None:
"""
Updates sys_modified_at to sys_received_at for records where sys_hash_key changed
(for given PK columns) in a Databricks Delta table.
Parameters:
:param run_id: run job id to identify latest added files within single batch, to update sys_received_at
:param mapped: mapped column for the table (used to order)
:param svcs: Databricks services
:param table_name: Fully qualified table name (e.g., 'db.schema.table')
:param pk_columns: List of primary key column names (e.g., ['pk1', 'pk2'])
:return: None
"""
# Load your Delta table as a DataFrame
df = svcs.get('spark').table(table_name)
# Partition by PK, order by sys_received_at
w = Window.partitionBy(*pk_columns).orderBy("sys_received_at")
# Find where hash changes (1 if change, else 0)
hash_change = (col("sys_hash_key") != lag("sys_hash_key", 1).over(w)).cast("int")
# For first row, treat as version start
is_first_row = row_number().over(w) == 1
# Build version number by cumulative sum of changes (and first row)
version_num = sum(
when(is_first_row | (hash_change == 1), 1).otherwise(0)
).over(w.rowsBetween(Window.unboundedPreceding, 0))
# Attach version number
df = df.withColumn("version_num", version_num)
# Find the min sys_received_at for each version group
version_window = Window.partitionBy(*(pk_columns + ["version_num"]))
df = df.withColumn("new_sys_modified_at",
when(
col("version_num") == 1,
col(mapped).cast(TimestampType())
).otherwise(
min("sys_received_at").over(version_window)
)
)
# Only update if sys_modified_at differs from the new value
updates = df.filter(coalesce(col("sys_modified_at"), lit('1970-01-01 00:00:00')) != col("new_sys_modified_at")) \
.select(*pk_columns, "sys_received_at", "new_sys_modified_at") \
.dropDuplicates(pk_columns + ["sys_received_at"])
temp_view_name = "tmpview_"+table_name.replace("`","").split(".")[-1]
svcs['logger'].info(f'Createing tmp view {temp_view_name} for {table_name}')
updates.createOrReplaceTempView(temp_view_name)
join_cond = " AND ".join(
[f"target.{c} = updates.{c}" for c in pk_columns] + ["target.sys_received_at = updates.sys_received_at"])
sql = f"""
MERGE INTO {table_name} AS target
USING {temp_view_name} AS updates
ON {join_cond}
WHEN MATCHED THEN
UPDATE SET target.sys_modified_at = cast(updates.new_sys_modified_at as timestamp)
"""
svcs.get("logger").info(sql)
svcs.get('spark').sql(sql)
# # update sys_received_at after sys_modified_at is applied
# sql_sys_received_at = f"""
# update {table_name}
# set
# sys_received_at = current_timestamp()
# where sys_job_id = '{run_id}'
# """
# svcs.get("logger").info(sql_sys_received_at)
# svcs.get('spark').sql(sql_sys_received_at)
def add_sys_received_at_from_filename(
df: DataFrame,
file_name_col: str,
table_name: str = "_metadata.file_name",
output_col: str = "sys_received_at"
) -> DataFrame:
"""Adds a timestamp column extracted from a file name column using a standard pattern.
If the extraction fails, it uses the file modification time as a backup.
:param df: Input DataFrame
:param file_name_col: Name of the column containing the file name (e.g., '_metadata.file_name')
:param table_name: Table name prefix in the file name
:param output_col: Name of the output timestamp column (default: 'sys_received_at')
:return: DataFrame with the new timestamp column
"""
pattern = f"{table_name}_(\\d{{4}}-\\d{{2}}-\\d{{2}}_\\d{{2}}_\\d{{2}}_\\d{{2}})"
extracted = F.regexp_extract(F.col(file_name_col), pattern, 1)
received_at_ts = F.to_timestamp(extracted, "yyyy-MM-dd_HH_mm_ss")
backup_received_at = F.col("_metadata").file_modification_time
# Use backup if extraction failed (null or empty string)
sys_received_at = F.when(
(extracted.isNull()) | (extracted == ""), backup_received_at
).otherwise(received_at_ts)
return df.withColumn(output_col, sys_received_at)
def autoloader_data(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str, args: dict[str, str]) -> None:
"""
Loads input json files into bronze table
:param args: asset bundle job parameters dict
:param svcs: Databricks services
:param conf: Databricks configuration
:param table_name: name of table to be created
:return:None
"""
# configs
layer = str(conf["databricks"]["layer"])
catalog = conf["databricks"]["catalog"]
schema = f"{conf['databricks']['schema_base']}"
incoming_path = conf["databricks"]["incoming_path"]
failed_path = conf["databricks"]["failed_path"]
landing_volume = conf["databricks"]["landing_volume"]
checkpoint = conf["databricks"]["checkpoint"]
map_created_at = conf["databricks"]["map_created_at"]
target_table = f"`{catalog}`.`{schema}`.`{table_name}`"
run_id = args["run_id"]
svcs.get("logger").info(f"Execution details {args}")
# paths
source_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{incoming_path}/{table_name}/"
checkpoint_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{checkpoint}/{table_name}/"
bad_files_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{failed_path}/{table_name}/"
svcs.get("logger").info(f"Loading data from {source_path}")
# schemas
table_json_schema = JSON_SCHEMAS.get(layer.upper(), {}).get(table_name)
metadata_autoloader_schema = METADATA_SCHEMA
# Register the UDF
uuid_udf = udf(generate_uuid, StringType())
# read source data
df = (
svcs.get("spark")
.readStream.format("cloudFiles")
.option("multiline", True)
.option("cloudFiles.format", "json")
.schema(table_json_schema)
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("badRecordsPath", bad_files_path)
.load(source_path)
.select("*", "_metadata")
)
# basic checks ADD MORE if needed
df = df.filter(col("_metadata").isNotNull())
df = df.withColumn("clean_metadata", col("_metadata").cast(metadata_autoloader_schema))
df = df.drop("_metadata").withColumnRenamed("clean_metadata", "_metadata")
# grab business columns
bc_columns = df.drop("_metadata").columns
# Expanding the DataFrame with new columns initialized to None
for field in JSON_SCHEMAS_SYS.get('sys').fields:
df = df.withColumn(field.name, lit(None).cast(field.dataType))
# sys_received_at is extracted from file name and casted to timestamp
df = add_sys_received_at_from_filename(df=df,
file_name_col="_metadata.file_name",
table_name=table_name)
df = df.withColumn("sys_is_deleted", lit(False))
df = df.withColumn("sys_job_id", lit(run_id))
df = df.withColumn("sys_source_filename", df["_metadata"].file_name)
df = df.withColumn("sys_source_name", lit('kakao'))
# mapping for sys_modified_at, timestamp modification at source
col_map_value = map_created_at.get(table_name)
if not col_map_value:
raise Exception(f"Table {table_name} not created")
df = df.withColumn("sys_modified_at", col(col_map_value).cast(TimestampType()))
df = df.withColumn("concat_tmp", concat_ws("|", *[col(c) for c in bc_columns]))
df = df.withColumn("sys_hash_key", md5(col("concat_tmp"))).drop("concat_tmp")
df = df.withColumn("sys_surrogate_pk", uuid_udf())
# write stream into table
query = (
df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_path)
.trigger(once=True) # batch
.toTable(target_table)
)
# wait till micro batch stored
query.awaitTermination()
# update modified_at
try:
# get processed files in this autoloader batch
svcs.get("logger").info(f'start doing update sys_modified_at for {run_id}')
pks = QUALITY_CHECKS.get(Constants.PKS, {})
pk_cols = pks.get(table_name)
part_cols = [col_name for col_name in pk_cols if col_name != "sys_received_at"]
update_sys_modified_at_on_hash_change(svcs=svcs,
table_name=target_table,
pk_columns=part_cols,
mapped=col_map_value,
run_id=run_id)
except Exception as e:
svcs.get("logger").error(f"SCD NOT applied with ERROR on {target_table}: {e}")
svcs.get("logger").info(f"SCD applied on {target_table}")
def quality_checks(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str,
enable_checks: dict[str, str] = None) -> None:
"""
Loads input json files into bronze table
:param table_name: Name of table to be checked
:param enable_checks: Dictionary with information about enabled checks
:param svcs: Databricks services
:param conf: Databricks configuration
:table_name: name of table to be checked against quality framework
:return:None
"""
# prepare config for table
layer = conf["databricks"]["layer"]
catalog = conf["databricks"]["catalog"]
schema = f"{conf['databricks']['schema_base']}"
bronze_table = f"`{catalog}`.`{schema}`.`{table_name}`"
df = svcs['spark'].read.format("delta").table(bronze_table)
records_for_quality_checks = df.count()
svcs.get("logger").info(f"Quality checks on bronze table {table_name} will be executed for: "
f"{records_for_quality_checks}")
pks = QUALITY_CHECKS.get(Constants.PKS, {})
# 0. Clear status of records
if enable_checks.get(Constants.CLEAR_STATUS, False):
dqf.clear_quality_flags(svcs=svcs, bronze_table=bronze_table)
svcs.get("logger").info(f"All records quality flags cleared for table: {bronze_table}")
else:
dqf.set_quality_flags(svcs=svcs, bronze_table=bronze_table)
svcs.get("logger").info(f"New records quality flags cleared for table: {bronze_table}")
# 1. Schema check
if enable_checks.get(Constants.SCHEMA, False):
# 1. check schema
combined_schema = StructType(
JSON_SCHEMAS[layer.upper()].get(table_name).fields
+ JSON_SCHEMAS_SYS.get('sys').fields
+ JSON_SCHEMAS_SYS.get('autoloader').fields
)
if not dqf.check_schema(svcs=svcs, df_or_table=df, expected_schema=combined_schema, table=bronze_table):
svcs.get("logger").info(f"Schema mismatch detected, skipping table: {bronze_table}")
return
else:
svcs.get("logger").info(f"Schema for table: {bronze_table} aligned with expected")
# 2.PKs check
if enable_checks.get(Constants.PKS, False):
dqf.check_primary_key_nulls(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
svcs.get("logger").info(f"PKs null for table: {bronze_table} checked")
dqf.check_primary_key_duplicates(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
svcs.get("logger").info(f"PKs duplicates for table: {bronze_table} checked")
# 3. lookup/reference checks
if enable_checks.get(Constants.REFS, False):
# check last modification
table_names = [
v["table"]
for v in (
entry for subdict in QUALITY_CHECKS.get(Constants.REFS, {}).values() for entry in subdict.values()
)
]
table_modification_time_dict = dqf.get_ref_latest_timestamp(
svcs=svcs,
catalog=catalog,
table_names=table_names
)
refs = QUALITY_CHECKS.get(Constants.REFS, {})
for ref_key, ref_value in refs.get(table_name, {}).items():
svcs.get("logger").info(f"Validating ref for table: {bronze_table} ref: {ref_key}")
dqf.check_lookup_values(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_name=ref_key,
lookup_table=f"{catalog}.{ref_value.get('table')}",
last_ref_modified=table_modification_time_dict[
f"{ref_value.get('table')}"
],
lookup_col_name=ref_value.get('field')
)
svcs.get("logger").info(f"REF for table: {bronze_table} and {ref_key} checked")
# 4. FK checks
if enable_checks.get(Constants.FKS, False):
fks = QUALITY_CHECKS.get(Constants.FKS, {})
for ref_key, ref_value in fks.get(table_name, {}).items():
svcs.get("logger").info(f"Validating FK for table: {bronze_table} ref: {ref_key}")
dqf.check_foreign_key(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_fk=ref_key,
remote_table=f"{catalog}.{schema}.{ref_value.get('table')}",
remote_col_pk=ref_value.get('field')
)
svcs.get("logger").info(f"FK for table: {bronze_table} and {ref_key} checked")
# 4. Harmonization
if enable_checks.get(Constants.HARMONIZATION, False):
harm = QUALITY_CHECKS.get(Constants.HARMONIZATION, {})
for harm_key, harm_value in harm.get(table_name, {}).items():
svcs.get("logger").info(f"Validating HARMONIZATION for table: {bronze_table} harmonization: {harm_key}")
dqf.check_harmonization_values(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_name=harm_key,
lookup_table=f"{catalog}.{harm_value.get('table')}",
lookup_col_name=harm_value.get('field')
)
svcs.get("logger").info(f"HARMONIZATION for table: {bronze_table} and {harm_key} checked")
# 5. Check length
if enable_checks.get(Constants.VALUE_MAX_LENGTH, False):
max_lengths = QUALITY_CHECKS.get(Constants.VALUE_MAX_LENGTH, {})
for limit_key, limit_value in max_lengths.get(table_name, {}).items():
dqf.check_max_length_field(
svcs=svcs,
bronze_table=bronze_table,
column=limit_key,
limit=limit_value
)
svcs.get("logger").info(f"Value length for table: {bronze_table} and {limit_key} "
f"with limit {limit_value} checked")
# 6. Activate SCD
if enable_checks.get(Constants.SCD, False):
dqf.calculate_active(svcs=svcs, bronze_table=bronze_table, pk_cols=pks.get(table_name))
svcs.get("logger").info(f"SCD for table: {bronze_table} applied ")
Code Walkthrough#
Importing Required Modules ℹ️#
[From conf.constants]
• Defines the layers of processing (Bronze, Silver, Gold) in the medallion architecture.
• Declared in the shared 'constants' file.
• databricks_utils:
• Contains core utilities like 'get_svcs', which initialize Databricks services for processing.
• databricks_utils_bronze:
• Focused on Bronze-layer-specific tasks, such as autoloader ingestion.
• databricks_infrastructure:
• Supports schema creation and directory preparation tasks.
• databricks_config:
• Handles environment-specific configuration retrieval.
• concurrent.futures:
• Standard Python library used to enable parallel execution for processing tables concurrently.
Note:
For detailed information about:
- Configuration
- Utilities
- Common Functions
- Infrastructure Functions
Please refer to the Modular Components document located in the Reference section.
from conf.constants import Layers
from dhopoc1 import databricks_utils, databricks_utils_bronze, databricks_infrastructure
from conf import databricks_config
import concurrent.futures
Entrypoint for Dev Environment ℹ️#
Acts as a shortcut to process data for the "dev" environment by calling the main landing_to_bronze function.
def landing_to_bronze_dev():
"""DEV landing to bronze"""
landing_to_bronze("dev")
Fetching Databricks Services and Configuration ℹ️#
[From databricks_utils]
• Initializes Databricks services:
• spark session: Enables Spark SQL/Delta capabilities.
• dbutils: Provides filesystem and utility functions.
• logger: Outputs logs related to execution.
• args: Fetches execution arguments (job_id, run_id).
[From databricks_config]
• Fetches environment-specific configuration for the Bronze layer.
Key Configuration Values:
• json_types: List of tables to process (e.g., ConsentStatus, Medication, etc.).
• parallel_processing: Boolean flag to enable concurrent processing.
• Directory paths: Includes incoming_path, failed_path, and checkpoint.
svcs = databricks_utils.get_svcs(env=env)
conf = databricks_config.get_databricks_configuration(env=env, mode="autoloader", layer=Layers.Bronze.value)
Infrastructure Setup ℹ️#
[From databricks_infrastructure]
• Handles schema and directory setup. Get's the Schema and Configuration details from the schema and databricks configuration file
• Key Operations:
• Creates schema if not present (CREATE SCHEMA IF NOT EXISTS).
• Creates directories (e.g., incoming, failed, checkpoint).
• Clears existing environment if enabled (conf["databricks"]["recreate_env"]).
Note:
For detailed information about:
- Configuration
- Utilities
- Common Functions
- Infrastructure Functions
Please refer to the Modular Components document located in the Reference section.
databricks_infrastructure.create_schema(
svcs=svcs,
conf=conf,
create_dirs=True
)
Logging Execution Details ℹ️#
Getting args:
• Fetched via databricks_utils.get_svcs():
• job_id: Identifier for the Databricks pipeline.
• run_id: Unique ID for the specific run.
Logging:
• Logs execution details using the logger from svcs.
Note:
For detailed information about:
- Configuration
- Utilities
- Common Functions
- Infrastructure Functions
Please refer to the Modular Components document located in the Reference section.
args = svcs['args']
svcs["logger"].info(f"Databricks execution: JOBID:{args['job_id']} RUN_JOB_ID:{args['run_id']}")
Processing#
parallel Processing ℹ️#
Parallel Execution (enabled by conf["databricks"]["parallel_processing"]):
• Processes the files from Incoming directory concurrently using ThreadPoolExecutor and writes the data incrementally to Delta tables in the Bronze layer.
autoloader_data [From databricks_utils_bronze]:
• Reads data from the incoming_path and writes data to Delta table
• Uses Audit fields such as `sys_surrogate_pk` and `sys_hash_key` for deduplication, lineage tracking, and version control.
• Since AutoLoader inherently tracks processed files using a checkpoint directory, it skips already ingested files automatically. This ensures that:
- No duplicate files are reprocessed.
- All processed files remain in the `incoming_path`, and there's no need for additional file movement to a `loaded_path`,
- Bad Files are move to Bad File Directory
Note: For detailed explantion of autoloader_data function go through the Databricks Bronze Utility section
if parallel_processing:
parallel_workers = int(conf["databricks"].get("max_parallel_processes", 10))
with concurrent.futures.ThreadPoolExecutor(max_workers=parallel_workers) as executor:
futures = {executor.submit(
databricks_utils_bronze.autoloader_data, svcs, conf, table_name, args
): table_name for table_name in table_names}
Sequential Processing ℹ️#
• Processes each table one by one in a sequential manner, calling autoloader_data for each table.
else:
for table_name in conf["databricks"]["json_types"]:
databricks_utils_bronze.autoloader_data(svcs=svcs, conf=conf, table_name=table_name, args=args)
Bronze Layer Quality Checks#
The quality_bronze module serves as the final validation step for the Bronze layer pipeline. After loading raw JSON files into Delta tables via the landing_to_bronze process, this module validates that the ingested data meets all required quality standards, such as schema compliance, primary/foreign key validation, and harmonization checks.
Note on Implementation#
The process and structure of quality_bronze are almost identical to landing_to_bronze, except for the core function called for each table.
- In
landing_to_bronze, theautoloader_datafunction is used to ingest data into the Bronze layer. - In
quality_bronze, thequality_checksfunction is used to validate the ingested data.
Core Difference#
For each table in the configuration, the following function is invoked in quality_bronze to perform quality checks:
databricks_utils_bronze.quality_checks(
svcs=svcs, # Databricks services (e.g., Spark session, logger)
conf=conf, # Environment-specific configuration
table_name=table_name, # Name of the current table being validated
enable_checks=enable_checks # Dictionary of quality checks to apply (e.g., schema, PK, FK, harmonization)
)
Databricks Bronze Utility#
The Databricks Bronze Utility module (databricks_utils_bronze) is designed to manage the ingestion of raw data into the Bronze layer of the medallion architecture. This module ensures the data is properly structured, enriched with audit data, and stored in Delta tables for further processing in the Silver and Gold layers.
Primary Responsibilities#
- Ingest raw data from a landing zone using Databricks Autoloader.
- Validate and enhance data by applying schema validation and adding audit fields.
- Ensure data consistency with Slowly Changing Dimensions (SCD) updates.
- Enable scalability through streaming and batch processing.
- Prepare data for downstream layers by ensuring clean, governed Bronze-level data.
This utility acts as the entry point for onboarding raw data into Delta Lake, enforcing data quality checks and providing robust handling of metadata during the ingestion process.
List of Available Functions#
Here is the list of functions provided in the module along with their purpose:
| Function Name | Description |
|---|---|
generate_uuid |
Generates a unique UUID. Used for creating surrogate primary keys (sys_surrogate_pk) during ingestion. |
update_sys_modified_at_on_hash_change |
Updates the sys_modified_at column for records where the hash key changes (supports tracking SCD data). |
add_sys_received_at_from_filename |
Extracts a timestamp from filenames, falling back to file modification time if unavailable. |
autoloader_data |
Ingests JSON files into Bronze Delta tables. Enhances data with metadata columns and writes to streaming/Delta tables. |
quality_checks |
Performs data quality checks,such as schema validation, primary/foreign key checks, and harmonization logic. |
autoloader_data() ℹ️#
def autoloader_data(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str, args: dict[str, str]) -> None:
"""
Reads input JSON files from a source path and loads them into a Delta Bronze table,
enriching the data with additional metadata fields. Uses Databricks Auto Loader for streaming.
:param svcs: Databricks services (e.g., SparkSession, Logger, etc.).
:param conf: Configuration dictionary containing paths, catalog, schema, etc.
:param table_name: Name of the Delta Bronze table where the processed data is written.
:param args: Job-specific parameters, such as `run_id`.
"""
# Step 1: Extract catalog, schema, and other configurations
catalog = conf["databricks"]["catalog"]
schema = conf["databricks"]["schema_base"]
incoming_path = conf["databricks"]["incoming_path"]
failed_path = conf["databricks"]["failed_path"]
landing_volume = conf["databricks"]["landing_volume"]
checkpoint = conf["databricks"]["checkpoint"]
target_table = f"`{catalog}`.`{schema}`.`{table_name}`"
run_id = args["run_id"]
# Paths for data ingestion, checkpoints, and bad records
source_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{incoming_path}/{table_name}/"
checkpoint_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{checkpoint}/{table_name}/"
bad_files_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/{failed_path}/{table_name}/"
# Log execution details for troubleshooting and monitoring
svcs.get("logger").info(f"Execution details: {args}")
svcs.get("logger").info(f"Source path: {source_path}")
# Step 2: Extract the table schema and metadata schema
table_json_schema = JSON_SCHEMAS.get(conf["databricks"]["layer"].upper(), {}).get(table_name)
metadata_schema = METADATA_SCHEMA
# Step 3: Read source data with Auto Loader and validate against schema
# Databricks Auto Loader handles incremental data ingestion.
# Invalid records (malformed JSON or incompatible schema) are redirected to `badRecordsPath`.
df = (
svcs["spark"]
.readStream.format("cloudFiles")
.option("cloudFiles.format", "json") # Expecting JSON file format
.schema(table_json_schema) # Apply predefined schema for input data
.option("cloudFiles.schemaLocation", checkpoint_path) # Store schema inference
.option("badRecordsPath", bad_files_path) # Redirect invalid JSON files
.load(source_path) # Load records from the specified source path
.select("*", "_metadata") # Include Auto Loader metadata for debugging and processing
)
# Step 4: Filter invalid data and enrich with metadata fields
# Filtering removes records missing _metadata (e.g., incomplete or malformed JSON).
df = df.filter(col("_metadata").isNotNull())
# Enrich data with cleaned metadata. This ensures `_metadata` is compatible with its schema.
df = df.withColumn("clean_metadata", col("_metadata").cast(metadata_schema))
df = df.drop("_metadata").withColumnRenamed("clean_metadata", "_metadata")
# Add system-generated metadata fields for lineage and traceability
# `sys_received_at`: Timestamp for when the pipeline ingests the record
df = df.withColumn("sys_received_at", current_timestamp())
# `sys_source_filename`: Name of the file the record originates from
df = df.withColumn("sys_source_filename", df["_metadata"].file_name)
# `sys_source_name`: Name of the source system (e.g., "kakao" in this case)
df = df.withColumn("sys_source_name", lit("kakao")) # Example value for system source
# System flags
# `sys_is_deleted`: False for all incoming records in the Bronze layer
df = df.withColumn("sys_is_deleted", lit(False))
# `sys_job_id`: Identifier assigned to the specific pipeline job execution
df = df.withColumn("sys_job_id", lit(run_id))
# Step 5: Generate hash keys and surrogate keys to uniquely identify records
# Generate a unique hash key for deduplication and change detection
# `sys_hash_key` combines all business columns to create a unique fingerprint of each record.
concat_col = concat_ws("|", *[col(c) for c in df.drop("_metadata").columns])
df = df.withColumn("sys_hash_key", md5(concat_col)) # MD5 hash for efficient change detection
# Generate a surrogate primary key
# A UUID is used as a unique surrogate key (`sys_surrogate_pk`) for each record.
# This ensures that each record is uniquely identifiable, regardless of source system properties.
df = df.withColumn("sys_surrogate_pk", udf(generate_uuid, StringType())())
# Step 6: Write enriched data back into a Delta table
# The function writes to the target Bronze Delta table using structured streaming.
query = (
df.writeStream.format("delta")
.outputMode("append") # Incrementally append new records into the Delta table
.option("checkpointLocation", checkpoint_path) # Enable fault-tolerant checkpointing for idempotency
.trigger(once=True) # Process the data in one micro-batch
.toTable(target_table) # Save processed records into the Delta Bronze table
)
# Wait until the streaming query completes
query.awaitTermination()
# Log success message after successful ingestion and data write
svcs.get("logger").info(f"Successfully loaded data into: {target_table}")
# Example Usage:
# Initialize services and configuration
services = get_svcs("dev")
conf = {
"databricks": {
"catalog": "example_catalog",
"schema_base": "example_schema",
"landing_volume": "raw_data",
"incoming_path": "incoming",
"failed_path": "bad_records",
"checkpoint": "checkpoints",
}
}
args = {"run_id": "job12345"}
# Call the autoloader function
autoloader_data(services, conf, "example_table", args)
# Outputs:
# 1. Data from JSON files is read and written to the "example_table".
# 2. Invalid files are redirected to `failed_path` for troubleshooting.
# 3. Metadata fields like `sys_received_at`, `sys_surrogate_pk`, and `sys_hash_key` are added.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Ingests JSON files into Delta Bronze tables, ensuring data quality through filtering and metadata enrichment. Applies surrogate key generation for traceability and schema validation for consistency. Tracks ingestion progress using checkpoint files for fault tolerance. | svcs (dict): Core services like Logger, SparkSession, and DBUtils.conf (dict): Configuration for paths, schema details, and metadata.table_name (str): Target Delta table.args (dict): Job-specific parameters, such as run_id. |
None. Processed data includes metadata fields ( sys_received_at, sys_job_id, sys_hash_key, sys_source_filename, and sys_surrogate_pk) and is written to Delta Bronze tables.Malformed records are redirected to the badRecordsPath.Checkpoint files ensure fault tolerance for batch or streaming ingestion. |
Use for fault-tolerant real-time or batch ingestion pipelines. Ideal for processing raw data into Bronze Delta tables that require unique identifiers, metadata enrichment, and incremental ingestion. |
generate_uuid() ℹ️#
def generate_uuid() -> str:
"""
Generates a universally unique identifier (UUID).
"""
return str(uuid.uuid4())
# Example Usage:
# Generate a UUID and assign it to a record
record_id = generate_uuid()
# Output: A unique string such as "3fa85f64-5717-4562-b3fc-2c963f66afa6"
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Generates a new Universally Unique Identifier (UUID), often used as a surrogate key for Delta table records. | None | A unique string identifier. | Use for creating unique records, enabling deduplication or version tracking in Bronze tables. |
| --- |
add_sys_received_at_from_filename() ℹ️#
def add_sys_received_at_from_filename(
df: DataFrame,
file_name_col: str,
table_name: str = "_metadata.file_name",
output_col: str = "sys_received_at"
) -> DataFrame:
"""
Adds a `sys_received_at` timestamp column derived from filenames. Defaults to the file's modification time if parsing fails.
"""
# Step 1: Define a regex pattern to extract timestamps from filenames
pattern = f"{table_name}_(\\d{{4}}-\\d{{2}}-\\d{{2}}_\\d{{2}}_\\d{{2}}_\\d{{2}})"
extracted = F.regexp_extract(F.col(file_name_col), pattern, 1) # Extract the timestamp based on the regex
received_at_ts = F.to_timestamp(extracted, "yyyy-MM-dd_HH_mm_ss") # Convert extracted string to timestamp type
backup_received_at = F.col("_metadata.file_modification_time") # Default to the file's modification time
# Step 2: Assign timestamps to the `sys_received_at` column
sys_received_at = F.when((extracted.isNull()) | (extracted == ""), backup_received_at).otherwise(received_at_ts)
return df.withColumn(output_col, sys_received_at) # Add the new column to the DataFrame
# Example Usage:
# Add sys_received_at column to a DataFrame based on file names
df = spark.read.format("json").load("/path/to/data")
df_with_sys_received_at = add_sys_received_at_from_filename(
df=df,
file_name_col="_metadata.file_name",
table_name="my_table"
)
# New timestamp column ("sys_received_at") will be available in the resulting DataFrame.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
Extracts timestamps from filenames or _metadata. Defaults to file modification time if no valid timestamp is found. |
df (DataFrame): Input DataFrame.file_name_col (str): Column containing file names.table_name (str): Source table prefix.output_col (str): Column for extracted timestamp. |
Returns the input DataFrame with an added sys_received_at column. |
Use during data ingestion workflows requiring timestamps derived from filenames or modification times. |
update_sys_modified_at_on_hash_change() ℹ️#
def update_sys_modified_at_on_hash_change(
svcs: dict[Any, Any],
table_name: str,
pk_columns: list[str],
mapped: str,
run_id: str
) -> None:
"""
Updates `sys_modified_at` in Delta tables where hash-key changes are detected for primary key columns.
"""
# Step 1: Load the Delta table into a Spark DataFrame
df = svcs.get("spark").table(table_name)
# Step 2: Partition the data by primary keys and order it by `sys_received_at`
w = Window.partitionBy(*pk_columns).orderBy("sys_received_at")
# Detect hash changes across rows
hash_change = (col("sys_hash_key") != lag("sys_hash_key", 1).over(w)).cast("int")
# Step 3: Create a version number column using cumulative sums of hash changes
version_num = sum(when(row_number().over(w) == 1 | (hash_change == 1), 1).otherwise(0)).over(w)
df = df.withColumn("version_num", version_num)
# Step 4: Assign new `sys_modified_at` timestamps
version_window = Window.partitionBy(*pk_columns + ["version_num"])
df = df.withColumn(
"new_sys_modified_at",
when(version_num == 1, col(mapped).cast(TimestampType())).otherwise(min("sys_received_at").over(version_window))
)
# Step 5: Execute update to apply changes to the Delta table
sql_merge = f"""
MERGE INTO {table_name} AS target
USING (SELECT * FROM {df}) AS updates
ON target.hash_key = updates.hash_key
WHEN MATCHED THEN
UPDATE SET target.sys_modified_at = updates.new_sys_modified_at
"""
svcs.get("spark").sql(sql_merge)
# Example Usage:
# Update the Delta table with new sys_modified_at values for changed records
update_sys_modified_at_on_hash_change(
svcs=services,
table_name="my_table",
pk_columns=["id"],
mapped="original_timestamp_column",
run_id="run_1234"
)
# The function will detect hash changes and update `sys_modified_at` values accordingly.
| What It Does | Input | Output | When to Use |
|---|---|---|---|
Updates the sys_modified_at value for Delta table records if hash key changes are detected, ensuring SCD (Slowly Changing Dimension) handling. |
svcs (dict): Databricks services (e.g., SparkSession).table_name (str): Fully qualified Delta table.pk_columns (list): Primary keys.mapped (str): Mapped column.run_id (str): Job execution ID. |
Updates the Delta table’s sys_modified_at column based on hash/key changes. |
Use for Bronze-to-Silver layer transformations or historical data versioning that needs SCD logic. |
quality_checks() ℹ️#
def quality_checks(svcs: dict[Any, Any], conf: dict[str, Any], table_name: str, enable_checks: dict[str, str] = None) -> None:
"""
Performs quality checks on a Bronze Delta table. Updates quality flags for errors and validates schema, PK/FK constraints, and harmonization.
:param svcs: Databricks core services (e.g., SparkSession, Logger).
:param conf: Configuration settings for paths, schemas, etc.
:param table_name: Name of the Delta Bronze table being validated.
:param enable_checks: A dictionary of flags to toggle specific checks (e.g., PK, FK, schema).
:return: None
"""
# Step 1: Prepare configurations
layer = conf["databricks"]["layer"] # Current pipeline layer (e.g., Bronze)
catalog = conf["databricks"]["catalog"] # Catalog for the table
schema = f"{conf['databricks']['schema_base']}" # Schema within the catalog
bronze_table = f"`{catalog}`.`{schema}`.`{table_name}`" # Fully qualified table name
# Load the Delta table into a DataFrame
df = svcs["spark"].read.format("delta").table(bronze_table)
records_for_quality_checks = df.count() # Track number of records checked
# Log details about the quality check being performed
svcs.get("logger").info(f"Starting quality checks on Bronze table {table_name} for: {records_for_quality_checks} records.")
# Fetch the primary key configuration
pks = QUALITY_CHECKS.get(Constants.PKS, {})
# -------------------------------------------------------------------------------------
# 0. Clear quality flags on new or existing records
# Check whether quality flags should be cleared before starting validations.
if enable_checks.get(Constants.CLEAR_STATUS, False):
dqf.clear_quality_flags(svcs=svcs, bronze_table=bronze_table)
svcs.get("logger").info(f"Cleared all quality flags for table: {bronze_table}.")
else:
dqf.set_quality_flags(svcs=svcs, bronze_table=bronze_table)
svcs.get("logger").info(f"Set initial quality flags for new records in table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 1. Schema Validation
# Ensure the table schema aligns with the expected schema definition.
if enable_checks.get(Constants.SCHEMA, False):
# Combine system fields and table-specific schema fields.
combined_schema = StructType(
JSON_SCHEMAS[layer.upper()].get(table_name).fields
+ JSON_SCHEMAS_SYS.get("sys").fields
)
# Check if schema matches the expected definition.
if not dqf.check_schema(svcs=svcs, df_or_table=df, expected_schema=combined_schema, table=bronze_table):
svcs.get("logger").error(f"Schema mismatch detected for table: {bronze_table}. Skipping further checks.")
return # Stop quality checks if the schema validation fails.
svcs.get("logger").info(f"Schema successfully validated for table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 2. Primary Key (PK) Checks
# Identify null or duplicate primary key violations in the table.
if enable_checks.get(Constants.PKS, False):
# Check for null values in the primary key columns
dqf.check_primary_key_nulls(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
svcs.get("logger").info(f"Primary key null check completed for table: {bronze_table}.")
# Check for duplicate primary key values across records
dqf.check_primary_key_duplicates(svcs=svcs, pk_cols=pks.get(table_name), bronze_table=bronze_table)
svcs.get("logger").info(f"Primary key duplicate check completed for table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 3. Reference (Lookup) Validation
# Validate that records in the table reference valid keys from external lookup tables.
if enable_checks.get(Constants.REFS, False):
refs = QUALITY_CHECKS.get(Constants.REFS, {})
for ref_key, ref_value in refs.get(table_name, {}).items():
# Validate against the lookup table and columns
dqf.check_lookup_values(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_name=ref_key,
lookup_table=f"{catalog}.{ref_value.get('table')}",
lookup_col_name=ref_value.get("field")
)
svcs.get("logger").info(f"Reference validation completed for field `{ref_key}` in table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 4. Foreign Key (FK) Checks
# Ensure foreign key constraints are upheld and reference valid primary keys in external tables.
if enable_checks.get(Constants.FKS, False):
fks = QUALITY_CHECKS.get(Constants.FKS, {})
for fk_key, fk_value in fks.get(table_name, {}).items():
dqf.check_foreign_key(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_fk=fk_key,
remote_table=f"{catalog}.{schema}.{fk_value.get('table')}",
remote_col_pk=fk_value.get("field")
)
svcs.get("logger").info(f"Foreign key validation completed for field `{fk_key}` in table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 5. Harmonization Validation
# Harmonize record values to ensure consistency (e.g., common formats).
if enable_checks.get(Constants.HARMONIZATION, False):
harm = QUALITY_CHECKS.get(Constants.HARMONIZATION, {})
for harm_key, harm_value in harm.get(table_name, {}).items():
dqf.check_harmonization_values(
svcs=svcs,
bronze_table=bronze_table,
bronze_col_name=harm_key,
lookup_table=f"{catalog}.{harm_value.get('table')}",
lookup_col_name=harm_value.get("field")
)
svcs.get("logger").info(f"Harmonization validation completed for field `{harm_key}` in table: {bronze_table}.")
# -------------------------------------------------------------------------------------
# 6. Check Length Constraints
# Verify that column values adhere to maximum lengths as defined in the schema.
if enable_checks.get(Constants.VALUE_MAX_LENGTH, False):
max_lengths = QUALITY_CHECKS.get(Constants.VALUE_MAX_LENGTH, {})
for limit_key, limit_value in max_lengths.get(table_name, {}).items():
dqf.check_max_length_field(
svcs=svcs,
bronze_table=bronze_table,
column=limit_key,
limit=limit_value,
)
svcs.get("logger").info(f"Length validation completed for field `{limit_key}` in table: {bronze_table}.")
services = get_svcs("dev")
conf = {
"databricks": {
"layer": "Bronze",
"catalog": "example_catalog",
"schema_base": "example_schema"
}
}
enable_checks = {
"SCHEMA": True,
"PKS": True,
"FKS": True,
"REFS": True,
"VALUE_MAX_LENGTH": True,
"HARMONIZATION": True,
"CLEAR_STATUS": False,
}
table_name = "example_table"
# Run quality checks
quality_checks(services, conf, table_name, enable_checks)
| What It Does | Input | Output | When to Use |
|---|---|---|---|
| Performs multiple quality validation checks on Bronze Delta tables, such as schema matching, primary key (PK), foreign key (FK), lookup validations, and harmonization checks. Tracks and flags errors, while ensuring clean and consistent data within the Bronze layer. | svcs (dict): Databricks services like SparkSession and Logger.conf (dict): Configuration settings for schema, catalog, and paths.table_name (str): Name of the Delta table to check.enable_checks (dict): A dictionary of boolean flags to enable or disable specific quality checks. |
Updates quality flags (e.g., error identification) directly within the Bronze table. | Use for ensuring data quality in Bronze Delta tables, aligning records to schema, PK/FK rules, and reference values. This function is useful before propagating records to Silver or Gold layers. |
References or Important Links#
Click here to access the Modular Components
Click here to access official Databricks Autoloader Documentation
Click here to access the repository to view the example: DHO-PoC1/src/dhopoc1/landing_to_bronze.py
Click here to access the repository to view the example: DHO-PoC1/src/dhopoc1/quality_bronze.py