Skip to content

Go to Playbook Main Page
Go to Ingestion Main Page
Go to File Ingestion Main Page
Go to Ingestion Design
Go to Modular Components
Go to Ingestion Inputs & Outputs
Go to File Ingestion Using Python
Go to File Ingestion Using Autoloader

File Ingestion Using Python#

This section explains the step-by-step File Ingestion using Python, detailing the process of managing raw data from the Landing Zone, validating and enriching it, writing it to the Bronze Layer, and managing processed and invalid files.

  1. Step 1: Landing Zone
    - Step 1.1: Dynamic Path Management
  2. Step 2: Data Validation and Enrichment
    - Step 2.1: Reading and Validating Data
    - Step 2.2: Enriching Data with Metadata
  3. Step 3: Persisting Data in the Bronze Layer
    - Step 3.1: Writing to Delta Table
  4. Step 4: Processed File Management
    - Step 4.1: Moving Processed Files
  5. Core Functions Table
  6. Source Code Repository

Step 1: Landing Zone#

Step 1.1: Dynamic Path Management#

Purpose: Dynamically generates paths for: - Raw files (incoming_path): Location where JSON files await ingestion. - Malformed files (bad_files_path): Directory for invalid records. - Checkpoint directory (checkpoint_path): Tracks progress for incremental processing. - Processed files (loaded_path): Directory for successfully processed files. - Target Delta table (target_table): Defines where enriched data will be saved.

Significance: - Simplifies configuration management by dynamically constructing paths. - Enables flexibility across datasets and environments (e.g., dev, prd).

Explanation: - Inputs: - conf: Configuration object containing base paths and properties (e.g., catalog, schema, incoming_dir). - table_name: Dataset-specific identifier for managing files and tables. - Outputs: - A tuple of dynamically generated paths for all key operations (e.g., ingestion, checkpointing, storage).

Click here for Code Walkthrough
    def get_paths(conf, table_name):
        """
        Dynamically constructs all critical paths for data handling.
        """
        catalog = conf["databricks"]["catalog"]
        schema = conf["databricks"]["schema_base"]
        landing_volume = conf["databricks"]["landing_volume"]
        incoming_dir = conf["databricks"]["incoming_dir"]
        checkpoint_dir = conf["databricks"]["checkpoint_dir"]
        failed_dir = conf["databricks"]["failed_dir"]
        loaded_dir = conf["databricks"]["loaded_dir"]

        landing_path = f"/Volumes/{catalog}/{schema}/{landing_volume}/kakao"
        incoming_path = f"{landing_path}/{incoming_dir}/{table_name}/"
        bad_files_path = f"{landing_path}/{failed_dir}/{table_name}/"
        checkpoint_path = f"{landing_path}/{checkpoint_dir}/{table_name}/"
        loaded_path = f"{landing_path}/{loaded_dir}/{table_name}/"
        target_table = f"{catalog}.{schema}.{table_name}"

        return incoming_path, bad_files_path, target_table, loaded_path

Step 2: Data Validation and Enrichment#

Step 2.1: Reading and Validating Data#

Purpose: Reads raw JSON files from the Landing Zone and validates them using predefined schemas. Invalid records are redirected to the bad_files_path.

Significance: - Ensures only well-structured data proceeds, maintaining data quality downstream. - Redirecting invalid records improves pipeline reliability by isolating malformed rows.

Explanation: - Inputs: - spark: SparkSession for distributed data processing. - incoming_path: Directory containing raw files for processing. - bad_files_path: Directory for storing invalid or malformed files. - json_schema: Schema definition to validate the structure of raw JSON data. - Outputs: - A validated Spark DataFrame containing rows that meet schema requirements.

Click here for Code Walkthrough
   def read_landing_data(spark, incoming_path, bad_files_path, json_schema):
    """
    Reads and validates raw JSON data from the source directory.
    """
    return (
        spark.read
        .format("json")
        .schema(json_schema)           # Apply schema validation
        .option("multiline", True)     # Handle multi-line JSON files
        .option("badRecordsPath", bad_files_path)  # Redirect malformed records
        .load(incoming_path)           # Load data into a Spark DataFrame
    )

Step 2.2: Enriching Data with Metadata#

Purpose: Adds metadata such as ingestion_time (processing timestamp) and file_name (lineage tracking) to aid in auditability and traceability.

Significance: - Provides critical metadata fields for debugging, historical analysis, and operational visibility. - Ensures all enriched records follow a standardized metadata schema for downstream processing.

Explanation: - Inputs: - df: DataFrame containing validated rows from Step 2.1. - metadata_schema: Defines the structure for metadata fields (e.g., file_name, ingestion_time). - Outputs: - A Spark DataFrame enriched with operational metadata fields for traceability.

Click here for Code Walkthrough
   def enrich_and_clean_data(df, metadata_schema):
    """
    Adds metadata information to the DataFrame for traceability.
    """
    return (
        df.withColumn("ingestion_time", current_timestamp())  # Add ingestion time
        .filter(col("_metadata").isNotNull())                # Validate metadata existence
        .withColumn("clean_metadata", col("_metadata").cast(metadata_schema))  # Typecast metadata
        .drop("_metadata")                                   # Drop raw metadata column
        .withColumnRenamed("clean_metadata", "_metadata")    # Rename cleaned metadata column
        .withColumn("file_name", col("_metadata.file_name")) # Add file name for lineage
    )

Step 3: Persisting Data in the Bronze Layer#

Step 3.1: Writing to Delta Table#

Purpose: Stores the processed and enriched records into the Delta Bronze Table as a persistent and ACID-compliant data source.

Significance: - Delta Tables provide strong fault tolerance, version control, and transactional consistency for downstream analytics. - Allows appending new records incrementally while preserving existing data.

Explanation: - Inputs: - df: DataFrame enriched with metadata, cleaned and processed. - target_table: Name or path of the Delta Table for saving data. - logger: Logs record counts and operational statuses for monitoring. - Outputs: - Enriched and validated data is appended to the Delta Bronze Table, forming a trusted data source.

Click here for Code Walkthrough
   def write_to_bronze_table(df, target_table, logger=None):
    """
    Writes enriched data into the Delta Bronze Table.
    """
    new_count = df.count()
    if logger:
        logger.info(f"[BATCH] Records written: {new_count}")
    if new_count > 0:
        df.drop("file_name").write.mode("append").format("delta").saveAsTable(target_table)
    else:
        if logger:
            logger.info(f"[BATCH] No new data to write for table: {target_table}")

Step 4: Processed File Management#

Step 4.1: Moving Processed Files#

Purpose: Moves successfully processed files from the raw directory (incoming_path) to the processed directory (loaded_path), ensuring separation of raw and processed data.

Significance: - Prevents duplicate processing of completed files in subsequent pipeline runs. - Organizes file management in the Landing Zone to reduce clutter and maintain a clear workflow.

Explanation: - Inputs: - incoming_path: Directory containing raw files for processing. - loaded_path: Directory for successfully processed files. - dbutils: Databricks utility module for secure file movement and management. - logger: Logs file transitions for auditability and debugging. - Outputs: - Files are successfully moved to loaded_path, ensuring raw data remains manageable.

Click here for Code Walkthrough
   def move_loaded_files(spark, incoming_path, loaded_path, dbutils, logger=None):
    """
    Moves processed files to the 'loaded files' directory.
    """
    try:
        files = dbutils.fs.ls(incoming_path)  # List files in the raw directory
        for f in files:
            if f.path.endswith(".json"):      # Process only JSON files
                dest = f"{loaded_path}{Path(f.path).name}"  # Destination path for file
                dbutils.fs.mv(f.path, dest)  # Move file to destination
                if logger:
                    logger.info(f"[BATCH] File moved from {f.path} to {dest}")
    except Exception as e:
        if logger:
            logger.error(f"[BATCH] File movement failed: {str(e)}")
        raise
Functions in Core Components with Process Steps ## **Functions** | Function Name | Utility | Config | Common | Infrastructure | Process Step | Description | |--------------|---------|---------|---------|----------------|--------------|-------------| | `get_paths(conf, table_name)` | ✓ | - | - | - | Step 1.1 | Constructs all required paths for processing | | `read_landing_data(spark, incoming_path, bad_files_path, json_schema)` | ✓ | - | - | - | Step 2.1 | Reads JSON data with schema validation | | `enrich_and_clean_data(df, metadata_schema)` | ✓ | - | - | - | Step 2.2 | Adds metadata and cleans dataframe | | `write_to_bronze_table(df, target_table, logger)` | ✓ | - | - | - | Step 3.1 | Writes data to bronze delta table | | `move_loaded_files(spark, incoming_path, loaded_path, dbutils, logger)` | ✓ | - | - | - | Step 4 | Moves processed files to loaded directory | | `process_table(table_name, spark, conf, schemas, dbutils, logger)` | ✓ | - | - | - | All Steps | Orchestrates complete table processing | | `prepare_environment(env)` | ✓ | - | - | - | Step 1.1 | Sets up processing environment | | `get_databricks_configuration(env, mode)` | - | ✓ | - | - | Step 1.1 | Returns configuration for specified environment | | `get_svcs(env)` | ✓ | - | - | - | Step 1.1 | Initializes Databricks services | | `create_schema(svcs, conf)` | - | - | - | ✓ | Step 1.1 | Creates necessary database objects |

Click here to access the repository: DHO-PoC1/src/dhopoc1/landing_to_bronze_pyspark.py

When to Use Python Ingestion#

Scenario Description Benefits
Data Pipeline Development Writing custom ingestion logic Flexible implementation
Complex Transformations Advanced data preprocessing Custom logic control
API Integration Connecting to external services Native API support
Custom Validation Implementing business rules Granular validation
Error Handling Detailed exception management Custom error flows
Metadata Processing File and schema management Dynamic handling
Multi-source Integration Combining different data sources Unified processing

When Not to Use Python Ingestion#

Scenario Reason Alternative
Simple Data Loading Overcomplication of process Auto Loader
High-Volume Streaming Performance overhead Structured Streaming
Standard ETL Tasks Reinventing the wheel Built-in connectors
Production Scheduling Maintenance complexity Workflow scheduler
Basic Transformations Development overhead SQL transformations
Quick Prototyping Setup time Notebook workflows
Standard Formats Available built-in readers Native readers