Skip to main content

Command Palette

Search for a command to run...

Article 4: Building Your First ETL Pipeline with Kestra.

Updated
M

Learner, Love to make things simple, Full Stack Developer, StackOverflower, Passionate about using machine learning, deep learning and AI

From Data Extraction to Loading - A Practical Guide

Introduction: Why ETL Still Matters in the Modern Data Stack

Remember when data engineering was "extract, transform, load"? Some say ETL is dead, replaced by ELT, reverse ETL, and data mesh. But here's the truth: ETL is more alive than ever, just smarter. And Kestra makes it accessible to everyone, not just senior data engineers.

In this article, you'll build a complete production-ready ETL pipeline that:

  • Extracts data from multiple sources (API, CSV, database)

  • Transforms with proper error handling and validation

  • Loads to a data warehouse

  • Monitors data quality

  • Sends alerts on failures

  • Scales to handle millions of records

The Business Problem: ACME Corp needs daily sales reports from multiple sources. Currently, marketing runs manual Excel reports, finance has SQL queries, and sales uses a legacy system. Your mission: automate everything into a single source of truth.


Phase 1: Blueprint Your ETL Pipeline

Understand Your Data Sources

Before writing a single line of YAML, document your sources:

SourceTypeFrequencyVolumeFormatOwner
Stripe APIPayment processorDaily10K recordsJSONFinance
ShopifyE-commerceHourly50K recordsCSVMarketing
PostgreSQLInternal DBDaily100K recordsSQLSales
Google SheetsManual inputWeekly500 recordsCSVOps

Design Your Target Schema

-- Target: Data warehouse (Snowflake/BigQuery/PostgreSQL)
CREATE TABLE sales_fact (
    sale_id VARCHAR(100) PRIMARY KEY,
    transaction_date DATE,
    customer_id VARCHAR(50),
    product_id VARCHAR(50),
    quantity INTEGER,
    unit_price DECIMAL(10,2),
    total_amount DECIMAL(10,2),
    payment_method VARCHAR(50),
    region VARCHAR(100),
    sales_rep VARCHAR(100),
    source_system VARCHAR(50),
    loaded_at TIMESTAMP,
    batch_id VARCHAR(100)
);

CREATE TABLE customer_dim (
    customer_id VARCHAR(50) PRIMARY KEY,
    customer_name VARCHAR(255),
    email VARCHAR(255),
    join_date DATE,
    customer_tier VARCHAR(50),
    lifetime_value DECIMAL(10,2),
    last_updated TIMESTAMP
);

Define Success Criteria

Your pipeline must:

  1. Process all data within 2 hours

  2. Maintain 99.9% data accuracy

  3. Handle missing data gracefully

  4. Send alerts on failures within 15 minutes

  5. Support backfills for 90 days


Phase 2: Setting Up Your Development Environment

Project Structure

# Create project structure
mkdir -p etl-pipeline/{flows,scripts,config,tests}
mkdir -p etl-pipeline/flows/{extract,transform,load,monitor}
mkdir -p etl-pipeline/scripts/{python,sql}
mkdir -p etl-pipeline/config/{dev,prod}
mkdir -p etl-pipeline/tests/{unit,integration}

# Initialize git
cd etl-pipeline
git init
echo "# ACME Sales ETL Pipeline" > README.md

Configuration Files

# config/dev/kestra.yml
kestra:
  server:
    basic-auth:
      enabled: true
      username: admin
      password: ${DEV_PASSWORD:?}

  repository:
    type: postgres
    postgres:
      host: localhost
      port: 5432
      database: kestra_dev

  storage:
    type: local
    local:
      base-path: ./storage

variables:
  environment: dev
  api_timeout: PT5M
  batch_size: 10000
  alert_email: dev-team@acme.com

secrets:
  stripe_api_key: ${STRIPE_API_KEY}
  shopify_token: ${SHOPIFY_TOKEN}
  db_password: ${DB_PASSWORD}
# config/prod/kestra.yml  
kestra:
  server:
    basic-auth:
      enabled: true
      username: ${ADMIN_USER}
      password: ${ADMIN_PASSWORD}

  repository:
    type: postgres
    postgres:
      host: prod-db.acme.com
      port: 5432
      database: kestra_prod

  storage:
    type: s3
    s3:
      bucket: acme-kestra-prod
      region: us-east-1

variables:
  environment: production
  api_timeout: PT10M
  batch_size: 50000
  alert_email: data-eng@acme.com
  slack_channel: "#data-alerts"

secrets:
  stripe_api_key: ${STRIPE_API_KEY}
  shopify_token: ${SHOPIFY_TOKEN}
  db_password: ${DB_PASSWORD_PROD}
  snowflake_password: ${SNOWFLAKE_PASSWORD}

Environment Variables

# .env.dev
DEV_PASSWORD=dev_password_123
STRIPE_API_KEY=sk_test_abc123
SHOPIFY_TOKEN=shpat_xyz789
DB_PASSWORD=postgres_pass
SNOWFLAKE_PASSWORD=snowflake_dev_pass

# .env.prod (keep secret!)
ADMIN_USER=admin
ADMIN_PASSWORD=super_secure_prod_pass_456
STRIPE_API_KEY=sk_live_xyz789
SHOPIFY_TOKEN=shpat_live_abc123
DB_PASSWORD_PROD=prod_super_secure_pass
SNOWFLAKE_PASSWORD=snowflake_prod_secure_pass

Docker Compose for Development

# docker-compose.yml
version: '3.8'

services:
  kestra:
    image: kestra/kestra:latest
    ports:
      - "8080:8080"
    environment:
      - KESTRA_CONFIGURATION=file:///app/config/dev/kestra.yml
    volumes:
      - ./flows:/app/flows
      - ./scripts:/app/scripts
      - ./config/dev:/app/config
      - ./storage:/app/storage
      - ./logs:/app/logs
    env_file:
      - .env.dev
    depends_on:
      - postgres
      - minio

  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_DB: kestra_dev
      POSTGRES_USER: kestra
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init-db:/docker-entrypoint-initdb.d

  minio:
    image: minio/minio
    command: server /data --console-address ":9090"
    ports:
      - "9000:9000"
      - "9090:9090"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    volumes:
      - minio_data:/data

  # Optional: Add test data sources
  mock-stripe:
    image: stoplight/prism:4
    command: mock -h 0.0.0.0 https://raw.githubusercontent.com/stripe/openapi/master/openapi/spec3.json
    ports:
      - "4010:4010"

volumes:
  postgres_data:
  minio_data:

Phase 3: Building the Extract Layer

Source 1: Stripe Payments API

# flows/extract/stripe-payments.yml
id: extract-stripe-payments
namespace: acme.sales.etl
revision: 1
description: |
  Extract payment data from Stripe API.

  ## Data Characteristics
  - Daily volume: ~10,000 transactions
  - Historical data available for 90 days
  - Includes: charges, customers, refunds

  ## Rate Limits
  - 100 requests per second
  - Paginated responses (limit 100 per page)

labels:
  source: stripe
  data-type: transactions
  pii: true
  compliance: pci-dss

variables:
  stripe_base_url: "https://api.stripe.com/v1"
  default_limit: 100
  max_pages: 1000

inputs:
  - name: start_date
    type: DATETIME
    description: "Start date for data extraction"
    defaults: "{{ now() | dateAdd(-1, 'DAYS') | date('yyyy-MM-dd') }}"

  - name: end_date
    type: DATETIME
    description: "End date for data extraction"
    defaults: "{{ now() | date('yyyy-MM-dd') }}"

  - name: data_types
    type: ARRAY
    description: "Types of data to extract"
    defaults: ["charges", "customers", "refunds", "payment_intents"]

tasks:
  # Task 1: Validate inputs
  - id: validate_dates
    type: io.kestra.plugin.core.condition.Expression
    expression: >
      {{ inputs.start_date | date('yyyy-MM-dd') <= 
         inputs.end_date | date('yyyy-MM-dd') }}
    errorMessage: "Start date must be before or equal to end date"

  # Task 2: Calculate date range
  - id: generate_date_range
    type: io.kestra.plugin.core.debug.Return
    format: >
      {{ range(inputs.start_date | date('yyyy-MM-dd'), 
               inputs.end_date | date('yyyy-MM-dd'), 
               'P1D') | tojson }}

  # Task 3: Extract each data type in parallel
  - id: extract_data_types
    type: io.kestra.plugin.core.flow.EachParallel
    value: "{{ inputs.data_types }}"
    tasks:
      - id: "extract_{{ task.value }}"
        type: io.kestra.plugin.core.http.Request
        description: "Extract {{ task.value }} from Stripe"
        uri: "{{ vars.stripe_base_url }}/{{ task.value }}"
        method: GET
        headers:
          Authorization: "Bearer {{ secret('STRIPE_API_KEY') }}"
        queryParams:
          limit: "{{ vars.default_limit }}"
          created[gte]: "{{ inputs.start_date | date('timestamp') }}"
          created[lte]: "{{ inputs.end_date | date('timestamp') }}"

        # Pagination handling
        pagination:
          type: next_page
          results: body.data
          nextPageToken: body.has_more
          nextPageTokenPath: body.has_more
          nextPageCursorPath: body.data[-1].id
          nextPageQueryParams:
            starting_after: "{{ nextPageToken }}"

        # Rate limiting
        rateLimit:
          requestsPerSecond: 10

        # Error handling
        retry:
          type: exponential
          maxAttempt: 5
          delay: PT10S

        # Store results
        store: true
        outputFile: "stripe_{{ task.value }}_{{ execution.startDate | date('yyyyMMddHHmmss') }}.json"

  # Task 4: Merge and validate extracted data
  - id: merge_stripe_data
    type: io.kestra.plugin.scripts.python.Script
    description: "Merge and validate Stripe data"
    inputFiles:
      {% for data_type in inputs.data_types %}
      stripe_{{ data_type }}.json: "{{ outputs.extract_data_types.outputs['extract_' + data_type].uri }}"
      {% endfor %}
    script: |
      import json
      import pandas as pd
      from datetime import datetime

      # Load all extracted data
      all_data = {}
      for data_type in {{ inputs.data_types | tojson }}:
          with open(f'stripe_{data_type}.json', 'r') as f:
              data = json.load(f)
              all_data[data_type] = data['data']

      # Validate data
      validation_results = {
          'extraction_time': datetime.now().isoformat(),
          'total_records': sum(len(data) for data in all_data.values()),
          'record_counts': {k: len(v) for k, v in all_data.items()},
          'date_range': {
              'start': '{{ inputs.start_date }}',
              'end': '{{ inputs.end_date }}'
          }
      }

      # Check for missing required fields
      required_fields = {
          'charges': ['id', 'amount', 'currency', 'customer'],
          'customers': ['id', 'email', 'created'],
          'refunds': ['id', 'charge', 'amount']
      }

      validation_results['missing_fields'] = {}
      for data_type, records in all_data.items():
          if data_type in required_fields:
              for record in records[:10]:  # Sample check
                  missing = [field for field in required_fields[data_type] 
                            if field not in record]
                  if missing:
                      validation_results['missing_fields'].setdefault(data_type, []).append(missing)

      # Save merged data
      merged_data = {}
      for data_type, records in all_data.items():
          df = pd.DataFrame(records)
          df.to_parquet(f'stripe_{data_type}_merged.parquet', index=False)
          merged_data[data_type] = f'stripe_{data_type}_merged.parquet'

      # Save validation results
      with open('validation_report.json', 'w') as f:
          json.dump(validation_results, f, indent=2)

      # Output to Kestra
      print(json.dumps({
          'validation': validation_results,
          'files': merged_data
      }))

    taskRunner:
      type: io.kestra.plugin.core.runner.Process
      memory: 2Gi
      cpu: 1
      image: python:3.11-slim

    dependsOn:
      - extract_data_types

  # Task 5: Send extraction summary
  - id: send_extraction_summary
    type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
    url: "{{ secret('SLACK_WEBHOOK_URL') }}"
    payload: |
      {
        "blocks": [
          {
            "type": "header",
            "text": {
              "type": "plain_text",
              "text": "๐Ÿ“ฅ Stripe Data Extraction Complete"
            }
          },
          {
            "type": "section",
            "fields": [
              {
                "type": "mrkdwn",
                "text": "*Date Range:*\n{{ inputs.start_date }} to {{ inputs.end_date }}"
              },
              {
                "type": "mrkdwn",
                "text": "*Total Records:*\n{{ outputs.merge_stripe_data.vars.validation.total_records }}"
              }
            ]
          },
          {
            "type": "section",
            "text": {
              "type": "mrkdwn",
              "text": "{% for data_type, count in outputs.merge_stripe_data.vars.validation.record_counts.items() %}โ€ข *{{ data_type }}*: {{ count }} records\n{% endfor %}"
            }
          }
        ]
      }

    dependsOn:
      - merge_stripe_data

triggers:
  - id: daily_extraction
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 1 * * *"  # 1 AM daily
    timezone: UTC
    inputs:
      start_date: "{{ now() | dateAdd(-1, 'DAYS') | date('yyyy-MM-dd') }}"
      end_date: "{{ now() | date('yyyy-MM-dd') }}"

timeout: PT1H

Source 2: Shopify E-commerce Data

# flows/extract/shopify-orders.yml
id: extract-shopify-orders
namespace: acme.sales.etl
description: Extract order data from Shopify

tasks:
  - id: get_order_ids
    type: io.kestra.plugin.core.http.Request
    uri: "https://{{ secret('SHOPIFY_STORE') }}.myshopify.com/admin/api/2024-01/orders.json"
    method: GET
    headers:
      X-Shopify-Access-Token: "{{ secret('SHOPIFY_TOKEN') }}"
    queryParams:
      status: any
      created_at_min: "{{ execution.startDate | dateAdd(-1, 'DAYS') | date('yyyy-MM-dd') }}T00:00:00Z"
      limit: 250
    store: true
    pagination:
      type: header_link
      maxPage: 100

  - id: enrich_orders
    type: io.kestra.plugin.core.flow.EachParallel
    value: "{{ outputs.get_order_ids.body.orders | map('id') | list }}"
    maxParallel: 5
    tasks:
      - id: "get_order_{{ task.value }}"
        type: io.kestra.plugin.core.http.Request
        uri: "https://{{ secret('SHOPIFY_STORE') }}.myshopify.com/admin/api/2024-01/orders/{{ task.value }}.json"
        method: GET
        headers:
          X-Shopify-Access-Token: "{{ secret('SHOPIFY_TOKEN') }}"
        queryParams:
          fields: "id,email,created_at,financial_status,total_price,line_items,customer"
        store: true
        outputFile: "shopify_order_{{ task.value }}.json"

Source 3: Internal PostgreSQL Database

# flows/extract/internal-sales.yml
id: extract-internal-sales
namespace: acme.sales.etl
description: Extract sales data from internal database

tasks:
  - id: extract_sales_transactions
    type: io.kestra.plugin.jdbc.postgresql.Query
    url: "jdbc:postgresql://{{ secret('DB_HOST') }}:5432/{{ secret('DB_NAME') }}"
    username: "{{ secret('DB_USER') }}"
    password: "{{ secret('DB_PASSWORD') }}"
    sql: |
      WITH daily_sales AS (
        SELECT 
          t.transaction_id,
          t.transaction_date,
          c.customer_id,
          c.customer_name,
          p.product_id,
          p.product_name,
          t.quantity,
          t.unit_price,
          t.total_amount,
          t.sales_rep_id,
          sr.region,
          'internal' as source_system,
          CURRENT_TIMESTAMP as extracted_at
        FROM transactions t
        JOIN customers c ON t.customer_id = c.customer_id
        JOIN products p ON t.product_id = p.product_id
        JOIN sales_reps sr ON t.sales_rep_id = sr.rep_id
        WHERE t.transaction_date >= '{{ execution.startDate | dateAdd(-1, 'DAYS') | date('yyyy-MM-dd') }}'
          AND t.transaction_date < '{{ execution.startDate | date('yyyy-MM-dd') }}'
          AND t.status = 'completed'
      )
      SELECT * FROM daily_sales
    fetch: true
    fetchSize: 10000
    chunkSize: 5000

  - id: save_to_parquet
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      sales_data.json: |
        {{ outputs.extract_sales_transactions.rows | tojson }}
    script: |
      import pandas as pd
      import json

      data = json.loads('sales_data.json')
      df = pd.DataFrame(data)

      # Convert date columns
      date_columns = ['transaction_date', 'extracted_at']
      for col in date_columns:
          if col in df.columns:
              df[col] = pd.to_datetime(df[col])

      # Save to parquet
      df.to_parquet('internal_sales.parquet', index=False)
      print(f"Saved {len(df)} records to internal_sales.parquet")

    dependsOn:
      - extract_sales_transactions

Source 4: Google Sheets (Manual Input)

# flows/extract/google-sheets.yml
id: extract-google-sheets
namespace: acme.sales.etl
description: Extract manual inputs from Google Sheets

tasks:
  - id: authenticate_google
    type: io.kestra.plugin.gcp.Oauth
    scopes:
      - https://www.googleapis.com/auth/spreadsheets.readonly
    credentials: "{{ secret('GOOGLE_CREDENTIALS_JSON') }}"

  - id: extract_sheet_data
    type: io.kestra.plugin.gcp.sheets.Read
    spreadsheetId: "{{ secret('GOOGLE_SHEET_ID') }}"
    range: "SalesData!A:Z"
    store: true

  - id: convert_to_structured
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      sheet_data.json: "{{ outputs.extract_sheet_data.uri }}"
    script: |
      import pandas as pd
      import json

      with open('sheet_data.json', 'r') as f:
          data = json.load(f)

      # Assuming first row is headers
      rows = data['values']
      if len(rows) < 2:
          raise ValueError("No data in sheet")

      headers = rows[0]
      data_rows = rows[1:]

      df = pd.DataFrame(data_rows, columns=headers)

      # Clean up data
      df = df.dropna(how='all')  # Remove empty rows

      # Standardize column names
      df.columns = df.columns.str.lower().str.replace(' ', '_')

      # Save
      df.to_parquet('manual_sales.parquet', index=False)
      print(f"Extracted {len(df)} manual entries")

Phase 4: Building the Transform Layer

Transformation Pipeline Architecture

# flows/transform/master-transformer.yml
id: master-transformer
namespace: acme.sales.etl
description: Orchestrate all data transformations

tasks:
  # Parallel transformation of all sources
  - id: transform_all_sources
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: transform_stripe
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl
        flowId: transform-stripe-payments
        wait: true
        transmit:
          - name: input_files
            value: "{{ parent.outputs.extract_stripe_payments.outputs.merge_stripe_data.vars.files }}"

      - id: transform_shopify
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl
        flowId: transform-shopify-orders
        wait: true

      - id: transform_internal
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl  
        flowId: transform-internal-sales
        wait: true

      - id: transform_manual
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl
        flowId: transform-google-sheets
        wait: true

  # Deduplicate across sources
  - id: deduplicate_records
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sales.etl
    flowId: deduplicate-sales
    wait: true
    transmit:
      - name: stripe_data
        value: "{{ outputs.transform_all_sources.outputs.transform_stripe.outputs.transformed_files }}"
      - name: shopify_data
        value: "{{ outputs.transform_all_sources.outputs.transform_shopify.outputs.transformed_files }}"
      - name: internal_data
        value: "{{ outputs.transform_all_sources.outputs.transform_internal.outputs.transformed_files }}"
      - name: manual_data
        value: "{{ outputs.transform_all_sources.outputs.transform_manual.outputs.transformed_files }}"

  # Enrich with customer data
  - id: enrich_customer_data
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sales.etl
    flowId: enrich-customer-dimension
    wait: true
    transmit:
      - name: sales_data
        value: "{{ outputs.deduplicate_records.outputs.deduplicated_file }}"

  # Calculate metrics
  - id: calculate_metrics
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sales.etl
    flowId: calculate-sales-metrics
    wait: true
    transmit:
      - name: enriched_data
        value: "{{ outputs.enrich_customer_data.outputs.enriched_file }}"

  # Generate data quality report
  - id: data_quality_check
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.data.quality
    flowId: run-data-quality-checks
    wait: true
    transmit:
      - name: transformed_data
        value: "{{ outputs.calculate_metrics.outputs.final_data }}"
      - name: validation_rules
        value: |
          {
            "rules": [
              {"field": "total_amount", "type": "not_null"},
              {"field": "customer_id", "type": "not_null"},
              {"field": "transaction_date", "type": "not_null"},
              {"field": "total_amount", "type": "positive"},
              {"field": "quantity", "type": "positive_integer"}
            ]
          }

Stripe Transformation Flow

# flows/transform/transform-stripe-payments.yml
id: transform-stripe-payments
namespace: acme.sales.etl
description: Transform raw Stripe data into standardized format

inputs:
  - name: input_files
    type: JSON
    description: "Map of data type to file path"

  - name: transformation_date
    type: DATETIME
    defaults: "{{ now() }}"

tasks:
  - id: load_and_transform
    type: io.kestra.plugin.scripts.python.Script
    description: "Transform Stripe payments data"
    inputFiles:
      {% for data_type, file_path in inputs.input_files.items() %}
      {{ data_type }}.parquet: "{{ file_path }}"
      {% endfor %}
      transformation_rules.py: |
        # Custom transformation rules
        def transform_stripe_charge(charge):
            """Transform a Stripe charge record"""
            return {
                'sale_id': f"stripe_{charge['id']}",
                'transaction_date': charge['created'],
                'customer_id': charge.get('customer'),
                'product_id': 'stripe_payment',
                'quantity': 1,
                'unit_price': charge['amount'] / 100,  # Convert from cents
                'total_amount': charge['amount'] / 100,
                'payment_method': charge.get('payment_method_details', {}).get('type', 'unknown'),
                'region': charge.get('billing_details', {}).get('address', {}).get('country'),
                'sales_rep': 'stripe',
                'source_system': 'stripe',
                'metadata': {
                    'currency': charge.get('currency'),
                    'description': charge.get('description'),
                    'status': charge.get('status')
                }
            }

        def transform_stripe_customer(customer):
            """Transform a Stripe customer record"""
            return {
                'customer_id': customer['id'],
                'customer_name': customer.get('name'),
                'email': customer.get('email'),
                'join_date': customer.get('created'),
                'customer_tier': 'standard',
                'lifetime_value': customer.get('metadata', {}).get('lifetime_value', 0),
                'metadata': {
                    'description': customer.get('description'),
                    'phone': customer.get('phone'),
                    'address': customer.get('address')
                }
            }
    script: |
      import pandas as pd
      import numpy as np
      from datetime import datetime
      import json
      from pathlib import Path

      # Import transformation rules
      import sys
      sys.path.append('.')
      from transformation_rules import transform_stripe_charge, transform_stripe_customer

      # Load data
      charges_df = pd.read_parquet('charges.parquet')
      customers_df = pd.read_parquet('customers.parquet')
      refunds_df = pd.read_parquet('refunds.parquet')

      # Transform charges
      print(f"Transforming {len(charges_df)} charges...")
      transformed_charges = []
      for _, charge in charges_df.iterrows():
          try:
              transformed = transform_stripe_charge(charge.to_dict())
              transformed_charges.append(transformed)
          except Exception as e:
              print(f"Error transforming charge {charge.get('id')}: {e}")

      # Transform customers
      print(f"Transforming {len(customers_df)} customers...")
      transformed_customers = []
      for _, customer in customers_df.iterrows():
          try:
              transformed = transform_stripe_customer(customer.to_dict())
              transformed_customers.append(transformed)
          except Exception as e:
              print(f"Error transforming customer {customer.get('id')}: {e}")

      # Handle refunds (adjust charges)
      if len(refunds_df) > 0:
          print(f"Processing {len(refunds_df)} refunds...")
          refunds_by_charge = refunds_df.groupby('charge')
          # This would adjust the charges data
          # Implementation depends on business rules

      # Convert to DataFrames
      sales_df = pd.DataFrame(transformed_charges)
      customers_df = pd.DataFrame(transformed_customers)

      # Data validation
      validation_errors = []

      # Check for nulls in required fields
      required_sales_fields = ['sale_id', 'transaction_date', 'total_amount']
      for field in required_sales_fields:
          null_count = sales_df[field].isnull().sum()
          if null_count > 0:
              validation_errors.append(f"Sales {field}: {null_count} null values")

      # Check for negative amounts
      negative_amounts = (sales_df['total_amount'] < 0).sum()
      if negative_amounts > 0:
          validation_errors.append(f"Sales: {negative_amounts} negative amounts")

      # Check for future dates
      today = pd.Timestamp.now()
      future_dates = (pd.to_datetime(sales_df['transaction_date']) > today).sum()
      if future_dates > 0:
          validation_errors.append(f"Sales: {future_dates} future dates")

      # Save validation report
      validation_report = {
          'transformation_date': datetime.now().isoformat(),
          'records_processed': {
              'sales': len(sales_df),
              'customers': len(customers_df)
          },
          'validation_errors': validation_errors,
          'error_rate': len(validation_errors) / (len(sales_df) + len(customers_df)) if (len(sales_df) + len(customers_df)) > 0 else 0
      }

      with open('validation_report.json', 'w') as f:
          json.dump(validation_report, f, indent=2)

      # Save transformed data
      sales_df.to_parquet('stripe_sales_transformed.parquet', index=False)
      customers_df.to_parquet('stripe_customers_transformed.parquet', index=False)

      # Output to Kestra
      output = {
          'transformed_files': {
              'sales': 'stripe_sales_transformed.parquet',
              'customers': 'stripe_customers_transformed.parquet'
          },
          'validation': validation_report,
          'summary': {
              'total_sales': len(sales_df),
              'total_customers': len(customers_df),
              'total_revenue': sales_df['total_amount'].sum() if len(sales_df) > 0 else 0
          }
      }

      print(json.dumps(output))

    taskRunner:
      type: io.kestra.plugin.core.runner.Process
      memory: 4Gi
      cpu: 2
      image: python:3.11-slim

  - id: handle_validation_errors
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ outputs.load_and_transform.vars.validation.error_rate }}"
    cases:
      0:
        - id: log_success
          type: io.kestra.plugin.core.log.Log
          message: "Stripe transformation successful with 0 validation errors"

      0.01:
        - id: log_warning
          type: io.kestra.plugin.core.log.Log
          message: "Stripe transformation completed with {{ (outputs.load_and_transform.vars.validation.error_rate * 100) | round(2) }}% error rate"

      default:
        - id: alert_high_error_rate
          type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
          url: "{{ secret('SLACK_WEBHOOK_URL') }}"
          payload: |
            {
              "blocks": [
                {
                  "type": "header",
                  "text": {
                    "type": "plain_text",
                    "text": "โš ๏ธ High Error Rate in Stripe Transformation"
                  }
                },
                {
                  "type": "section",
                  "text": {
                    "type": "mrkdwn",
                    "text": "Error rate: *{{ (outputs.load_and_transform.vars.validation.error_rate * 100) | round(2) }}%*\n\nValidation errors:\n{% for error in outputs.load_and_transform.vars.validation.validation_errors %}- {{ error }}\n{% endfor %}"
                  }
                }
              ]
            }

Data Quality Transformation

# flows/transform/data-quality-checks.yml
id: run-data-quality-checks
namespace: acme.data.quality
description: Run comprehensive data quality checks

inputs:
  - name: transformed_data
    type: STRING
    description: "Path to transformed data file"

  - name: validation_rules
    type: JSON
    description: "Data quality validation rules"

tasks:
  - id: run_great_expectations
    type: io.kestra.plugin.greatexpectations.Validation
    expectationSuite: |
      {
        "expectation_suite_name": "sales_data_suite",
        "expectations": [
          {
            "expectation_type": "expect_column_values_to_not_be_null",
            "kwargs": {
              "column": "sale_id"
            }
          },
          {
            "expectation_type": "expect_column_values_to_be_between",
            "kwargs": {
              "column": "total_amount",
              "min_value": 0,
              "max_value": 1000000
            }
          },
          {
            "expectation_type": "expect_column_values_to_be_in_set",
            "kwargs": {
              "column": "source_system",
              "value_set": ["stripe", "shopify", "internal", "manual"]
            }
          }
        ]
      }
    data:
      type: file
      path: "{{ inputs.transformed_data }}"

  - id: generate_data_profiling
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      data.parquet: "{{ inputs.transformed_data }}"
    script: |
      import pandas as pd
      import json
      from ydata_profiling import ProfileReport

      # Load data
      df = pd.read_parquet('data.parquet')

      # Generate profile
      profile = ProfileReport(df, title="Sales Data Profile")
      profile.to_file("data_profile.html")

      # Calculate statistics
      stats = {
          'row_count': len(df),
          'column_count': len(df.columns),
          'missing_values': df.isnull().sum().to_dict(),
          'data_types': {col: str(dtype) for col, dtype in df.dtypes.items()},
          'numeric_stats': {},
          'categorical_stats': {}
      }

      # Numeric columns
      numeric_cols = df.select_dtypes(include=['number']).columns
      for col in numeric_cols:
          stats['numeric_stats'][col] = {
              'mean': float(df[col].mean()),
              'std': float(df[col].std()),
              'min': float(df[col].min()),
              'max': float(df[col].max()),
              'median': float(df[col].median())
          }

      # Categorical columns
      categorical_cols = df.select_dtypes(include=['object']).columns
      for col in categorical_cols:
          stats['categorical_stats'][col] = {
              'unique_count': int(df[col].nunique()),
              'top_value': df[col].mode().iloc[0] if not df[col].mode().empty else None,
              'top_frequency': int(df[col].value_counts().iloc[0]) if not df[col].value_counts().empty else 0
          }

      # Save stats
      with open('data_statistics.json', 'w') as f:
          json.dump(stats, f, indent=2)

      print(json.dumps({'profile_file': 'data_profile.html', 'statistics': stats}))

Phase 5: Building the Load Layer

Data Warehouse Loading Strategy

# flows/load/load-to-warehouse.yml
id: load-to-warehouse
namespace: acme.sales.etl
description: Load transformed data to data warehouse

variables:
  warehouse_type: snowflake  # or bigquery, redshift, postgresql
  load_strategy: incremental  # or full, upsert
  batch_id: "{{ execution.id }}"

inputs:
  - name: sales_data_path
    type: STRING
    description: "Path to transformed sales data"

  - name: customer_data_path
    type: STRING
    description: "Path to transformed customer data"

  - name: load_date
    type: DATETIME
    defaults: "{{ now() }}"

tasks:
  # Task 1: Create staging tables
  - id: create_staging_tables
    type: io.kestra.plugin.jdbc.snowflake.Query
    sql: |
      CREATE OR REPLACE TEMPORARY TABLE staging_sales (
        sale_id VARCHAR(100),
        transaction_date DATE,
        customer_id VARCHAR(50),
        product_id VARCHAR(50),
        quantity INTEGER,
        unit_price DECIMAL(10,2),
        total_amount DECIMAL(10,2),
        payment_method VARCHAR(50),
        region VARCHAR(100),
        sales_rep VARCHAR(100),
        source_system VARCHAR(50),
        loaded_at TIMESTAMP,
        batch_id VARCHAR(100)
      );

      CREATE OR REPLACE TEMPORARY TABLE staging_customers (
        customer_id VARCHAR(50),
        customer_name VARCHAR(255),
        email VARCHAR(255),
        join_date DATE,
        customer_tier VARCHAR(50),
        lifetime_value DECIMAL(10,2),
        last_updated TIMESTAMP,
        batch_id VARCHAR(100)
      );
    role: "{{ secret('SNOWFLAKE_ROLE') }}"
    warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"

  # Task 2: Load data to staging (parallel)
  - id: load_staging_data
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: load_sales_staging
        type: io.kestra.plugin.jdbc.snowflake.Load
        from: "{{ inputs.sales_data_path }}"
        table: staging_sales
        schema: TEMPORARY
        fileFormat: "(TYPE = PARQUET)"
        copyOptions:
          - ON_ERROR = CONTINUE
        role: "{{ secret('SNOWFLAKE_ROLE') }}"
        warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"

      - id: load_customers_staging
        type: io.kestra.plugin.jdbc.snowflake.Load
        from: "{{ inputs.customer_data_path }}"
        table: staging_customers
        schema: TEMPORARY
        fileFormat: "(TYPE = PARQUET)"
        copyOptions:
          - ON_ERROR = CONTINUE
        role: "{{ secret('SNOWFLAKE_ROLE') }}"
        warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"

  # Task 3: Validate staging data
  - id: validate_staging_data
    type: io.kestra.plugin.jdbc.snowflake.Query
    fetch: true
    sql: |
      WITH staging_stats AS (
        SELECT 
          COUNT(*) as staging_count,
          COUNT(DISTINCT sale_id) as unique_sales,
          SUM(CASE WHEN sale_id IS NULL THEN 1 ELSE 0 END) as null_sales_ids,
          SUM(CASE WHEN total_amount < 0 THEN 1 ELSE 0 END) as negative_amounts
        FROM staging_sales
      ),
      existing_stats AS (
        SELECT COUNT(*) as existing_count
        FROM sales_fact 
        WHERE DATE(loaded_at) = '{{ inputs.load_date | date('yyyy-MM-dd') }}'
      )
      SELECT 
        s.staging_count,
        s.unique_sales,
        s.null_sales_ids,
        s.negative_amounts,
        e.existing_count,
        CASE 
          WHEN s.null_sales_ids > 0 THEN 'FAIL'
          WHEN s.negative_amounts > s.staging_count * 0.01 THEN 'WARN'
          ELSE 'PASS'
        END as validation_status
      FROM staging_stats s
      CROSS JOIN existing_stats e
    role: "{{ secret('SNOWFLAKE_ROLE') }}"
    warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"

    dependsOn:
      - load_staging_data

  # Task 4: Merge to production (incremental load)
  - id: merge_to_production
    type: io.kestra.plugin.jdbc.snowflake.Query
    sql: |
      -- Merge sales data
      MERGE INTO sales_fact AS target
      USING staging_sales AS source
      ON target.sale_id = source.sale_id
      AND target.transaction_date = source.transaction_date
      WHEN MATCHED THEN
        UPDATE SET 
          customer_id = source.customer_id,
          product_id = source.product_id,
          quantity = source.quantity,
          unit_price = source.unit_price,
          total_amount = source.total_amount,
          payment_method = source.payment_method,
          region = source.region,
          sales_rep = source.sales_rep,
          source_system = source.source_system,
          loaded_at = source.loaded_at,
          batch_id = source.batch_id
      WHEN NOT MATCHED THEN
        INSERT (
          sale_id, transaction_date, customer_id, product_id,
          quantity, unit_price, total_amount, payment_method,
          region, sales_rep, source_system, loaded_at, batch_id
        )
        VALUES (
          source.sale_id, source.transaction_date, source.customer_id, source.product_id,
          source.quantity, source.unit_price, source.total_amount, source.payment_method,
          source.region, source.sales_rep, source.source_system, source.loaded_at, source.batch_id
        );

      -- Merge customer data
      MERGE INTO customer_dim AS target
      USING staging_customers AS source
      ON target.customer_id = source.customer_id
      WHEN MATCHED AND target.last_updated < source.last_updated THEN
        UPDATE SET 
          customer_name = source.customer_name,
          email = source.email,
          join_date = source.join_date,
          customer_tier = source.customer_tier,
          lifetime_value = source.lifetime_value,
          last_updated = source.last_updated
      WHEN NOT MATCHED THEN
        INSERT (
          customer_id, customer_name, email, join_date,
          customer_tier, lifetime_value, last_updated
        )
        VALUES (
          source.customer_id, source.customer_name, source.email, source.join_date,
          source.customer_tier, source.lifetime_value, source.last_updated
        );
    role: "{{ secret('SNOWFLAKE_ROLE') }}"
    warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"

    dependsOn:
      - validate_staging_data

  # Task 5: Generate load statistics
  - id: generate_load_stats
    type: io.kestra.plugin.jdbc.snowflake.Query
    fetch: true
    sql: |
      SELECT 
        'sales_fact' as table_name,
        COUNT(*) as total_rows,
        COUNT(DISTINCT sale_id) as unique_sales,
        SUM(total_amount) as total_revenue,
        MIN(transaction_date) as earliest_date,
        MAX(transaction_date) as latest_date
      FROM sales_fact
      WHERE batch_id = '{{ vars.batch_id }}'

      UNION ALL

      SELECT 
        'customer_dim' as table_name,
        COUNT(*) as total_customers,
        COUNT(DISTINCT customer_id) as unique_customers,
        SUM(lifetime_value) as total_lifetime_value,
        MIN(join_date) as earliest_join,
        MAX(join_date) as latest_join
      FROM customer_dim
      WHERE last_updated >= '{{ inputs.load_date | date('yyyy-MM-dd') }}'
    role: "{{ secret('SNOWFLAKE_ROLE') }}"
    warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"

    dependsOn:
      - merge_to_production

  # Task 6: Send load completion notification
  - id: send_load_notification
    type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
    url: "{{ secret('SLACK_WEBHOOK_URL') }}"
    payload: |
      {
        "blocks": [
          {
            "type": "header",
            "text": {
              "type": "plain_text",
              "text": "โœ… ETL Load Complete"
            }
          },
          {
            "type": "section",
            "fields": [
              {
                "type": "mrkdwn",
                "text": "*Load Date:*\n{{ inputs.load_date | date('yyyy-MM-dd HH:mm') }}"
              },
              {
                "type": "mrkdwn",
                "text": "*Batch ID:*\n{{ vars.batch_id }}"
              }
            ]
          },
          {% for row in outputs.generate_load_stats.rows %}
          {
            "type": "section",
            "text": {
              "type": "mrkdwn",
              "text": "*{{ row.table_name }}*\nRows: {{ row.total_rows }}\n{% if row.table_name == 'sales_fact' %}Revenue: ${{ row.total_revenue | round(2) }}{% else %}LTV: ${{ row.total_lifetime_value | round(2) }}{% endif %}"
            }
          },
          {% endfor %}
          {
            "type": "context",
            "elements": [
              {
                "type": "mrkdwn",
                "text": "Execution: {{ execution.id }}"
              }
            ]
          }
        ]
      }

    dependsOn:
      - generate_load_stats

Alternative: BigQuery Loading

# flows/load/load-to-bigquery.yml
id: load-to-bigquery
namespace: acme.sales.etl

tasks:
  - id: load_sales_to_bigquery
    type: io.kestra.plugin.gcp.bigquery.Load
    from: "{{ inputs.sales_data_path }}"
    destinationTable:
      projectId: "{{ secret('GCP_PROJECT_ID') }}"
      datasetId: sales_data
      tableId: sales_fact
    format: PARQUET
    writeDisposition: WRITE_APPEND
    createDisposition: CREATE_IF_NEEDED

  - id: run_dbt_transformation
    type: io.kestra.plugin.dbt.cli.Run
    projectDir: "s3://acme-dbt-project/"
    profilesDir: "s3://acme-dbt-profiles/"
    target: prod
    select:
      - sales_fact+
      - customer_dim+

Phase 6: Orchestrating the Complete Pipeline

Master Orchestrator Flow

# flows/orchestrator/daily-sales-etl.yml
id: daily-sales-etl
namespace: acme.sales
revision: 1
description: |
  # Daily Sales ETL Pipeline

  Complete end-to-end pipeline for sales data.
  Extracts from 4 sources, transforms, validates, and loads to warehouse.

  ## SLA: 4 hours
  ## Schedule: Daily at 1 AM UTC
  ## Owner: Data Engineering Team

labels:
  pipeline: sales-etl
  tier: 1
  sla: 4-hours
  owner: data-engineering

variables:
  pipeline_version: "1.0"
  notification_channels:
    - slack: "#data-alerts"
    - email: "data-eng@acme.com"
  retry_policy:
    max_attempts: 3
    delay_minutes: 5

inputs:
  - name: execution_date
    type: DATETIME
    defaults: "{{ now() | date('yyyy-MM-dd') }}"
  - name: reprocess_full_day
    type: BOOLEAN
    defaults: false

tasks:
  # ========== PHASE 1: SETUP ==========
  - id: initialize_pipeline
    type: io.kestra.plugin.core.flow.Sequential
    description: "Pipeline initialization and validation"
    tasks:
      - id: validate_execution_date
        type: io.kestra.plugin.core.condition.Expression
        expression: >
          {{ inputs.execution_date | date('yyyy-MM-dd') <= 
             now() | date('yyyy-MM-dd') }}
        errorMessage: "Execution date cannot be in the future"

      - id: check_previous_executions
        type: io.kestra.plugin.core.flow.Previous
        namespace: "{{ namespace.id }}"
        flowId: "{{ flow.id }}"
        successOnly: true

      - id: should_execute
        type: io.kestra.plugin.core.condition.Expression
        expression: >
          {{ inputs.reprocess_full_day }} or
          {{ outputs.check_previous_executions.executions | length == 0 }} or
          {{ outputs.check_previous_executions.executions[0].state.current != 'SUCCESS' }}

      - id: log_pipeline_start
        type: io.kestra.plugin.core.log.Log
        message: |
          ๐Ÿš€ Starting Daily Sales ETL Pipeline
          Date: {{ inputs.execution_date }}
          Pipeline Version: {{ vars.pipeline_version }}
          Execution ID: {{ execution.id }}

  # ========== PHASE 2: EXTRACTION ==========
  - id: extract_all_sources
    type: io.kestra.plugin.core.flow.Parallel
    description: "Extract data from all sources in parallel"
    tasks:
      - id: extract_stripe
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl
        flowId: extract-stripe-payments
        wait: true
        transmit:
          - name: start_date
            value: "{{ inputs.execution_date }}"
          - name: end_date
            value: "{{ inputs.execution_date }}"

      - id: extract_shopify
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl
        flowId: extract-shopify-orders
        wait: true

      - id: extract_internal
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl
        flowId: extract-internal-sales
        wait: true

      - id: extract_manual
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl
        flowId: extract-google-sheets
        wait: true

    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.initialize_pipeline.outputs.should_execute.value }}"

  # ========== PHASE 3: TRANSFORMATION ==========
  - id: transform_and_validate
    type: io.kestra.plugin.core.flow.Sequential
    description: "Transform and validate extracted data"
    tasks:
      - id: run_transformations
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.etl
        flowId: master-transformer
        wait: true

      - id: check_transformation_quality
        type: io.kestra.plugin.core.condition.Expression
        expression: >
          {{ outputs.run_transformations.outputs.data_quality_check.outputs.quality_score | default(100) }} >= 95
        errorMessage: "Data quality score below 95%"

    dependsOn:
      - extract_all_sources

  # ========== PHASE 4: LOADING ==========
  - id: load_to_data_warehouse
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sales.etl
    flowId: load-to-warehouse
    wait: true
    transmit:
      - name: sales_data_path
        value: "{{ outputs.transform_and_validate.outputs.run_transformations.outputs.calculate_metrics.outputs.final_data }}"
      - name: customer_data_path
        value: "{{ outputs.transform_and_validate.outputs.run_transformations.outputs.enrich_customer_data.outputs.customer_file }}"
      - name: load_date
        value: "{{ inputs.execution_date }}"

    dependsOn:
      - transform_and_validate

  # ========== PHASE 5: POST-PROCESSING ==========
  - id: post_processing
    type: io.kestra.plugin.core.flow.Parallel
    description: "Post-load processing and reporting"
    tasks:
      - id: generate_daily_report
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.sales.reporting
        flowId: generate-daily-sales-report
        wait: true
        transmit:
          - name: report_date
            value: "{{ inputs.execution_date }}"

      - id: update_data_catalog
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.data.governance
        flowId: update-data-catalog
        wait: true

      - id: archive_raw_data
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.data.storage
        flowId: archive-raw-data
        wait: true

    dependsOn:
      - load_to_data_warehouse

  # ========== PHASE 6: COMPLETION ==========
  - id: pipeline_completion
    type: io.kestra.plugin.core.flow.Sequential
    description: "Pipeline completion and notifications"
    tasks:
      - id: calculate_pipeline_metrics
        type: io.kestra.plugin.core.debug.Return
        format: >
          {
            "start_time": "{{ execution.startDate }}",
            "end_time": "{{ now() }}",
            "duration_seconds": {{ (now() - execution.startDate).totalSeconds() | round(2) }},
            "extraction_records": {{ outputs.extract_all_sources.outputs.extract_stripe.outputs.merge_stripe_data.vars.validation.total_records | default(0) }},
            "loaded_records": {{ outputs.load_to_data_warehouse.outputs.generate_load_stats.rows[0].total_rows | default(0) }},
            "success_rate": 100
          }

      - id: send_success_notification
        type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
        url: "{{ secret('SLACK_WEBHOOK_URL') }}"
        payload: |
          {
            "blocks": [
              {
                "type": "header",
                "text": {
                  "type": "plain_text",
                  "text": "๐ŸŽ‰ Daily Sales ETL Complete"
                }
              },
              {
                "type": "section",
                "fields": [
                  {
                    "type": "mrkdwn",
                    "text": "*Date:*\n{{ inputs.execution_date }}"
                  },
                  {
                    "type": "mrkdwn",
                    "text": "*Duration:*\n{{ outputs.calculate_pipeline_metrics.value.duration_seconds | round(2) }} seconds"
                  }
                ]
              },
              {
                "type": "section",
                "text": {
                  "type": "mrkdwn",
                  "text": "*Records Processed*\nExtracted: {{ outputs.calculate_pipeline_metrics.value.extraction_records }}\nLoaded: {{ outputs.calculate_pipeline_metrics.value.loaded_records }}"
                }
              }
            ]
          }

      - id: update_monitoring_dashboard
        type: io.kestra.plugin.core.http.Request
        uri: "{{ secret('MONITORING_API_URL') }}/api/v1/metrics"
        method: POST
        headers:
          Authorization: "Bearer {{ secret('MONITORING_API_KEY') }}"
        body: |
          {
            "metric": "etl_pipeline_duration",
            "value": {{ outputs.calculate_pipeline_metrics.value.duration_seconds }},
            "tags": {
              "pipeline": "daily-sales-etl",
              "date": "{{ inputs.execution_date }}"
            }
          }

    dependsOn:
      - post_processing

  # ========== ERROR HANDLING ==========
  - id: handle_pipeline_failure
    type: io.kestra.plugin.core.flow.Sequential
    description: "Handle pipeline failures"
    tasks:
      - id: send_failure_alert
        type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
        url: "{{ secret('SLACK_CRITICAL_WEBHOOK_URL') }}"
        payload: |
          {
            "blocks": [
              {
                "type": "header",
                "text": {
                  "type": "plain_text",
                  "text": "๐Ÿšจ ETL Pipeline Failure"
                }
              },
              {
                "type": "section",
                "text": {
                  "type": "mrkdwn",
                  "text": "*Pipeline:* Daily Sales ETL\n*Date:* {{ inputs.execution_date }}\n*Error:* {{ execution.state.current }}\n*Execution:* {{ execution.id }}"
                }
              },
              {
                "type": "actions",
                "elements": [
                  {
                    "type": "button",
                    "text": {
                      "type": "plain_text",
                      "text": "View Details"
                    },
                    "url": "{{ serverUrl }}/ui/executions/{{ execution.id }}"
                  }
                ]
              }
            ]
          }

      - id: trigger_backup_pipeline
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.backup
        flowId: run-backup-sales-pipeline
        wait: false  # Fire and forget

    conditions:
      - type: io.kestra.plugin.core.condition.Execution
        states: [FAILED, KILLED, WARNING]

triggers:
  - id: daily_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 1 * * *"  # 1 AM UTC daily
    timezone: UTC
    inputs:
      execution_date: "{{ now() | date('yyyy-MM-dd') }}"

  - id: manual_trigger
    type: io.kestra.plugin.core.trigger.Webhook
    key: daily-sales-etl
    inputs:
      execution_date: "{{ trigger.date | default(now() | date('yyyy-MM-dd')) }}"
      reprocess_full_day: "{{ trigger.reprocess | default(false) }}"

timeout: PT4H  # 4 hour timeout

listeners:
  - conditions:
      - type: io.kestra.plugin.core.condition.Execution
        states: [FAILED]
    tasks:
      - id: emergency_cleanup
        type: io.kestra.plugin.core.flow.Subflow
        namespace: system.emergency
        flowId: cleanup-failed-pipeline
        transmit:
          - name: execution_id
            value: "{{ execution.id }}"

Phase 7: Testing and Monitoring

Unit Tests for Transformation Logic

# tests/unit/test_transformations.py
import pytest
import pandas as pd
from datetime import datetime
import sys
sys.path.append('../scripts')
from transformations import transform_stripe_charge, validate_sales_data

def test_transform_stripe_charge():
    """Test Stripe charge transformation"""
    sample_charge = {
        'id': 'ch_123',
        'created': 1672531200,  # 2023-01-01
        'customer': 'cus_456',
        'amount': 1000,
        'currency': 'usd',
        'payment_method_details': {'type': 'card'},
        'billing_details': {'address': {'country': 'US'}}
    }

    result = transform_stripe_charge(sample_charge)

    assert result['sale_id'] == 'stripe_ch_123'
    assert result['total_amount'] == 10.0  # Converted from cents
    assert result['region'] == 'US'
    assert result['source_system'] == 'stripe'

def test_validate_sales_data():
    """Test sales data validation"""
    # Valid data
    valid_df = pd.DataFrame({
        'sale_id': ['1', '2'],
        'total_amount': [100.0, 200.0],
        'quantity': [1, 2]
    })

    assert validate_sales_data(valid_df) == []

    # Invalid data
    invalid_df = pd.DataFrame({
        'sale_id': ['1', None],
        'total_amount': [-100.0, 200.0],
        'quantity': [1, -1]
    })

    errors = validate_sales_data(invalid_df)
    assert len(errors) == 3  # Null sale_id, negative amount, negative quantity

def test_integration_flow():
    """Test complete flow execution"""
    # This would use Kestra's test framework
    # to execute flows with mock data
    pass

Integration Test Flow

# tests/integration/test-daily-etl.yml
id: test-daily-etl
namespace: acme.tests
description: Integration test for daily ETL pipeline

tasks:
  - id: setup_test_data
    type: io.kestra.plugin.scripts.python.Script
    script: |
      # Generate test data
      import pandas as pd
      import numpy as np
      from datetime import datetime, timedelta

      # Create test Stripe data
      dates = pd.date_range(start='2024-01-01', end='2024-01-02', freq='H')
      stripe_data = []
      for i, date in enumerate(dates):
          stripe_data.append({
              'id': f'ch_test_{i}',
              'created': int(date.timestamp()),
              'customer': f'cus_test_{i % 100}',
              'amount': np.random.randint(100, 10000),
              'currency': 'usd',
              'status': 'succeeded'
          })

      df = pd.DataFrame(stripe_data)
      df.to_parquet('test_stripe_data.parquet', index=False)

  - id: run_etl_with_test_data
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sales.etl
    flowId: daily-sales-etl
    wait: true
    transmit:
      - name: execution_date
        value: "2024-01-01"
      # Override extract task to use test data

  - id: validate_results
    type: io.kestra.plugin.jdbc.snowflake.Query
    fetch: true
    sql: |
      SELECT 
        COUNT(*) as loaded_count,
        SUM(total_amount) as total_revenue
      FROM sales_fact
      WHERE batch_id = '{{ outputs.run_etl_with_test_data.executionId }}'

  - id: assert_test_passed
    type: io.kestra.plugin.core.condition.Expression
    expression: "{{ outputs.validate_results.rows[0].loaded_count > 0 }}"
    errorMessage: "Test failed: No data loaded"

Monitoring Dashboard

# flows/monitoring/etl-dashboard.yml
id: etl-monitoring-dashboard
namespace: acme.monitoring
description: Generate ETL monitoring dashboard

tasks:
  - id: collect_metrics
    type: io.kestra.plugin.jdbc.snowflake.Query
    fetch: true
    sql: |
      WITH pipeline_stats AS (
        SELECT 
          DATE(loaded_at) as load_date,
          COUNT(*) as records_loaded,
          SUM(total_amount) as daily_revenue,
          COUNT(DISTINCT source_system) as sources_loaded
        FROM sales_fact
        WHERE loaded_at >= DATEADD(day, -30, CURRENT_DATE())
        GROUP BY DATE(loaded_at)
      ),
      error_stats AS (
        SELECT 
          DATE(start_date) as error_date,
          COUNT(*) as failed_executions,
          STRING_AGG(flow_id, ', ') as failed_flows
        FROM kestra.executions
        WHERE state = 'FAILED'
          AND start_date >= DATEADD(day, -30, CURRENT_DATE())
          AND namespace LIKE 'acme.sales%'
        GROUP BY DATE(start_date)
      )
      SELECT 
        COALESCE(p.load_date, e.error_date) as date,
        p.records_loaded,
        p.daily_revenue,
        p.sources_loaded,
        COALESCE(e.failed_executions, 0) as failed_executions,
        e.failed_flows
      FROM pipeline_stats p
      FULL OUTER JOIN error_stats e ON p.load_date = e.error_date
      ORDER BY date DESC

  - id: generate_dashboard
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      metrics.json: |
        {{ outputs.collect_metrics.rows | tojson }}
    script: |
      import pandas as pd
      import plotly.graph_objects as go
      from plotly.subplots import make_subplots
      import json

      # Load metrics
      data = json.loads('metrics.json')
      df = pd.DataFrame(data)

      # Create dashboard
      fig = make_subplots(
          rows=2, cols=2,
          subplot_titles=('Records Loaded', 'Daily Revenue', 
                         'Sources Loaded', 'Failed Executions')
      )

      # Records loaded
      fig.add_trace(
          go.Bar(x=df['date'], y=df['records_loaded'], name='Records'),
          row=1, col=1
      )

      # Daily revenue
      fig.add_trace(
          go.Scatter(x=df['date'], y=df['daily_revenue'], 
                    mode='lines+markers', name='Revenue'),
          row=1, col=2
      )

      # Sources loaded
      fig.add_trace(
          go.Scatter(x=df['date'], y=df['sources_loaded'],
                    mode='lines+markers', name='Sources'),
          row=2, col=1
      )

      # Failed executions
      fig.add_trace(
          go.Bar(x=df['date'], y=df['failed_executions'], 
                name='Failures', marker_color='red'),
          row=2, col=2
      )

      # Update layout
      fig.update_layout(
          title='ETL Pipeline Dashboard - Last 30 Days',
          showlegend=False,
          height=800
      )

      # Save dashboard
      fig.write_html('etl_dashboard.html')
      print('Dashboard generated: etl_dashboard.html')

Phase 8: Deployment and Maintenance

CI/CD Pipeline for ETL Flows

# .github/workflows/deploy-etl.yml
name: Deploy ETL Flows
on:
  push:
    branches: [main]
    paths:
      - 'flows/**'
      - 'scripts/**'

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Validate YAML syntax
        run: |
          pip install yamllint
          yamllint -c .yamllint flows/

      - name: Validate with Kestra
        run: |
          docker run --rm \
            -v $(pwd)/flows:/flows \
            kestra/kestra:latest \
            flow validate /flows

  test:
    runs-on: ubuntu-latest
    needs: validate
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD: test
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
    steps:
      - uses: actions/checkout@v4

      - name: Run integration tests
        run: |
          docker-compose -f docker-compose.test.yml up \
            --abort-on-container-exit \
            --exit-code-from tests

  deploy:
    runs-on: ubuntu-latest
    needs: test
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4

      - name: Deploy to Development
        run: |
          curl -X POST "https://dev-kestra.acme.com/api/v1/flows/batch" \
            -H "Authorization: Bearer ${{ secrets.KESTRA_DEV_TOKEN }}" \
            -F "files=@flows/orchestrator/daily-sales-etl.yml"

      - name: Deploy to Production
        if: success()
        run: |
          curl -X POST "https://prod-kestra.acme.com/api/v1/flows/batch" \
            -H "Authorization: Bearer ${{ secrets.KESTRA_PROD_TOKEN }}" \
            -F "files=@flows/orchestrator/daily-sales-etl.yml"

Backup and Recovery Strategy

# flows/maintenance/backup-flows.yml
id: backup-flows
namespace: system.maintenance
description: Backup all flows and configurations

tasks:
  - id: export_all_flows
    type: io.kestra.plugin.core.flow.Export
    namespace: ".*"  # All namespaces

  - id: compress_backup
    type: io.kestra.plugin.scripts.shell.Commands
    commands:
      - tar -czf flows_backup_$(date +%Y%m%d_%H%M%S).tar.gz *.yml

  - id: upload_to_s3
    type: io.kestra.plugin.aws.s3.Upload
    from: "flows_backup_*.tar.gz"
    bucket: acme-kestra-backups
    key: "backups/{{ execution.startDate | date('yyyy/MM/dd') }}/"

  - id: cleanup_old_backups
    type: io.kestra.plugin.aws.s3.Delete
    bucket: acme-kestra-backups
    regex: "backups/.*\.tar\.gz"
    age: P30D  # Delete backups older than 30 days

Conclusion: Your Production-Ready ETL Pipeline

Congratulations! You've just built a complete, production-grade ETL pipeline with Kestra. Let's review what you've accomplished:

โœ… What You Built:

  1. Multi-source extraction from Stripe, Shopify, internal DB, and Google Sheets

  2. Robust transformation with data quality checks and error handling

  3. Incremental loading to Snowflake with merge operations

  4. Comprehensive monitoring with alerts and dashboards

  5. Testing framework for validation and regression testing

  6. CI/CD pipeline for automated deployment

๐ŸŽฏ Key Success Factors:

  1. Modular design: Each component is independent and testable

  2. Error resilience: Graceful handling of API failures, data issues

  3. Observability: Complete visibility into pipeline execution

  4. Maintainability: Clear documentation and version control

  5. Scalability: Parallel processing for large datasets

๐Ÿš€ Next Steps to Production:

  1. Performance Tuning:

     # Monitor and optimize
     kubectl top pods -n kestra
     # Check slow queries in Snowflake
    
  2. Security Hardening:

    • Rotate API keys monthly

    • Enable audit logging

    • Implement network policies

    • Regular security scans

  3. Cost Optimization:

    • Right-size compute resources

    • Implement data retention policies

    • Monitor cloud spend

  4. Disaster Recovery:

     # Regular backup testing
     - id: test_restore
       type: io.kestra.plugin.core.flow.Subflow
       namespace: system.dr
       flowId: test-backup-restore
       schedule: "0 0 * * 0"  # Weekly
    

๐Ÿ“ˆ Measuring Success:

Track these KPIs:

  1. Pipeline reliability: >99.5% success rate

  2. Data freshness: <4 hours from source to warehouse

  3. Data quality: <1% error rate in transformations

  4. Cost efficiency: <$0.01 per 1000 records processed

  5. Team productivity: <1 hour to add new data source

๐Ÿ”ฎ Future Enhancements:

  1. Real-time processing: Add streaming with Kafka

  2. Machine learning: Integrate model training pipelines

  3. Data governance: Implement data lineage and catalog

  4. Self-service: Build UI for business users to trigger flows

  5. Cross-region replication: For global redundancy


Your Challenge:

Now it's your turn! Pick one improvement to implement:

  1. Add a new data source (e.g., Salesforce, HubSpot)

  2. Implement data masking for PII in test environments

  3. Build a real-time alert for revenue anomalies

  4. Create a self-healing pipeline that auto-retries with different strategies

  5. Add predictive scaling based on data volume patterns

Share what you build in the comments below!


Remember: The best ETL pipeline isn't the most complexโ€”it's the most reliable. Start simple, monitor everything, and iterate based on data.

In the next article, we'll dive into Advanced Workflow Patterns in Kestra, where we'll explore parallel processing, dynamic workflows, and machine learning pipelines.

Before the next article, try:

  1. Deploy your ETL pipeline to a cloud environment

  2. Set up monitoring alerts for your critical flows

  3. Run a backfill for the last 7 days of data

  4. Document your pipeline architecture for your team

Happy data engineering! ๐Ÿš€