Article 4: Building Your First ETL Pipeline with Kestra.
Learner, Love to make things simple, Full Stack Developer, StackOverflower, Passionate about using machine learning, deep learning and AI
From Data Extraction to Loading - A Practical Guide
Introduction: Why ETL Still Matters in the Modern Data Stack
Remember when data engineering was "extract, transform, load"? Some say ETL is dead, replaced by ELT, reverse ETL, and data mesh. But here's the truth: ETL is more alive than ever, just smarter. And Kestra makes it accessible to everyone, not just senior data engineers.
In this article, you'll build a complete production-ready ETL pipeline that:
Extracts data from multiple sources (API, CSV, database)
Transforms with proper error handling and validation
Loads to a data warehouse
Monitors data quality
Sends alerts on failures
Scales to handle millions of records
The Business Problem: ACME Corp needs daily sales reports from multiple sources. Currently, marketing runs manual Excel reports, finance has SQL queries, and sales uses a legacy system. Your mission: automate everything into a single source of truth.
Phase 1: Blueprint Your ETL Pipeline
Understand Your Data Sources
Before writing a single line of YAML, document your sources:
| Source | Type | Frequency | Volume | Format | Owner |
| Stripe API | Payment processor | Daily | 10K records | JSON | Finance |
| Shopify | E-commerce | Hourly | 50K records | CSV | Marketing |
| PostgreSQL | Internal DB | Daily | 100K records | SQL | Sales |
| Google Sheets | Manual input | Weekly | 500 records | CSV | Ops |
Design Your Target Schema
-- Target: Data warehouse (Snowflake/BigQuery/PostgreSQL)
CREATE TABLE sales_fact (
sale_id VARCHAR(100) PRIMARY KEY,
transaction_date DATE,
customer_id VARCHAR(50),
product_id VARCHAR(50),
quantity INTEGER,
unit_price DECIMAL(10,2),
total_amount DECIMAL(10,2),
payment_method VARCHAR(50),
region VARCHAR(100),
sales_rep VARCHAR(100),
source_system VARCHAR(50),
loaded_at TIMESTAMP,
batch_id VARCHAR(100)
);
CREATE TABLE customer_dim (
customer_id VARCHAR(50) PRIMARY KEY,
customer_name VARCHAR(255),
email VARCHAR(255),
join_date DATE,
customer_tier VARCHAR(50),
lifetime_value DECIMAL(10,2),
last_updated TIMESTAMP
);
Define Success Criteria
Your pipeline must:
Process all data within 2 hours
Maintain 99.9% data accuracy
Handle missing data gracefully
Send alerts on failures within 15 minutes
Support backfills for 90 days
Phase 2: Setting Up Your Development Environment
Project Structure
# Create project structure
mkdir -p etl-pipeline/{flows,scripts,config,tests}
mkdir -p etl-pipeline/flows/{extract,transform,load,monitor}
mkdir -p etl-pipeline/scripts/{python,sql}
mkdir -p etl-pipeline/config/{dev,prod}
mkdir -p etl-pipeline/tests/{unit,integration}
# Initialize git
cd etl-pipeline
git init
echo "# ACME Sales ETL Pipeline" > README.md
Configuration Files
# config/dev/kestra.yml
kestra:
server:
basic-auth:
enabled: true
username: admin
password: ${DEV_PASSWORD:?}
repository:
type: postgres
postgres:
host: localhost
port: 5432
database: kestra_dev
storage:
type: local
local:
base-path: ./storage
variables:
environment: dev
api_timeout: PT5M
batch_size: 10000
alert_email: dev-team@acme.com
secrets:
stripe_api_key: ${STRIPE_API_KEY}
shopify_token: ${SHOPIFY_TOKEN}
db_password: ${DB_PASSWORD}
# config/prod/kestra.yml
kestra:
server:
basic-auth:
enabled: true
username: ${ADMIN_USER}
password: ${ADMIN_PASSWORD}
repository:
type: postgres
postgres:
host: prod-db.acme.com
port: 5432
database: kestra_prod
storage:
type: s3
s3:
bucket: acme-kestra-prod
region: us-east-1
variables:
environment: production
api_timeout: PT10M
batch_size: 50000
alert_email: data-eng@acme.com
slack_channel: "#data-alerts"
secrets:
stripe_api_key: ${STRIPE_API_KEY}
shopify_token: ${SHOPIFY_TOKEN}
db_password: ${DB_PASSWORD_PROD}
snowflake_password: ${SNOWFLAKE_PASSWORD}
Environment Variables
# .env.dev
DEV_PASSWORD=dev_password_123
STRIPE_API_KEY=sk_test_abc123
SHOPIFY_TOKEN=shpat_xyz789
DB_PASSWORD=postgres_pass
SNOWFLAKE_PASSWORD=snowflake_dev_pass
# .env.prod (keep secret!)
ADMIN_USER=admin
ADMIN_PASSWORD=super_secure_prod_pass_456
STRIPE_API_KEY=sk_live_xyz789
SHOPIFY_TOKEN=shpat_live_abc123
DB_PASSWORD_PROD=prod_super_secure_pass
SNOWFLAKE_PASSWORD=snowflake_prod_secure_pass
Docker Compose for Development
# docker-compose.yml
version: '3.8'
services:
kestra:
image: kestra/kestra:latest
ports:
- "8080:8080"
environment:
- KESTRA_CONFIGURATION=file:///app/config/dev/kestra.yml
volumes:
- ./flows:/app/flows
- ./scripts:/app/scripts
- ./config/dev:/app/config
- ./storage:/app/storage
- ./logs:/app/logs
env_file:
- .env.dev
depends_on:
- postgres
- minio
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: kestra_dev
POSTGRES_USER: kestra
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init-db:/docker-entrypoint-initdb.d
minio:
image: minio/minio
command: server /data --console-address ":9090"
ports:
- "9000:9000"
- "9090:9090"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio_data:/data
# Optional: Add test data sources
mock-stripe:
image: stoplight/prism:4
command: mock -h 0.0.0.0 https://raw.githubusercontent.com/stripe/openapi/master/openapi/spec3.json
ports:
- "4010:4010"
volumes:
postgres_data:
minio_data:
Phase 3: Building the Extract Layer
Source 1: Stripe Payments API
# flows/extract/stripe-payments.yml
id: extract-stripe-payments
namespace: acme.sales.etl
revision: 1
description: |
Extract payment data from Stripe API.
## Data Characteristics
- Daily volume: ~10,000 transactions
- Historical data available for 90 days
- Includes: charges, customers, refunds
## Rate Limits
- 100 requests per second
- Paginated responses (limit 100 per page)
labels:
source: stripe
data-type: transactions
pii: true
compliance: pci-dss
variables:
stripe_base_url: "https://api.stripe.com/v1"
default_limit: 100
max_pages: 1000
inputs:
- name: start_date
type: DATETIME
description: "Start date for data extraction"
defaults: "{{ now() | dateAdd(-1, 'DAYS') | date('yyyy-MM-dd') }}"
- name: end_date
type: DATETIME
description: "End date for data extraction"
defaults: "{{ now() | date('yyyy-MM-dd') }}"
- name: data_types
type: ARRAY
description: "Types of data to extract"
defaults: ["charges", "customers", "refunds", "payment_intents"]
tasks:
# Task 1: Validate inputs
- id: validate_dates
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ inputs.start_date | date('yyyy-MM-dd') <=
inputs.end_date | date('yyyy-MM-dd') }}
errorMessage: "Start date must be before or equal to end date"
# Task 2: Calculate date range
- id: generate_date_range
type: io.kestra.plugin.core.debug.Return
format: >
{{ range(inputs.start_date | date('yyyy-MM-dd'),
inputs.end_date | date('yyyy-MM-dd'),
'P1D') | tojson }}
# Task 3: Extract each data type in parallel
- id: extract_data_types
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ inputs.data_types }}"
tasks:
- id: "extract_{{ task.value }}"
type: io.kestra.plugin.core.http.Request
description: "Extract {{ task.value }} from Stripe"
uri: "{{ vars.stripe_base_url }}/{{ task.value }}"
method: GET
headers:
Authorization: "Bearer {{ secret('STRIPE_API_KEY') }}"
queryParams:
limit: "{{ vars.default_limit }}"
created[gte]: "{{ inputs.start_date | date('timestamp') }}"
created[lte]: "{{ inputs.end_date | date('timestamp') }}"
# Pagination handling
pagination:
type: next_page
results: body.data
nextPageToken: body.has_more
nextPageTokenPath: body.has_more
nextPageCursorPath: body.data[-1].id
nextPageQueryParams:
starting_after: "{{ nextPageToken }}"
# Rate limiting
rateLimit:
requestsPerSecond: 10
# Error handling
retry:
type: exponential
maxAttempt: 5
delay: PT10S
# Store results
store: true
outputFile: "stripe_{{ task.value }}_{{ execution.startDate | date('yyyyMMddHHmmss') }}.json"
# Task 4: Merge and validate extracted data
- id: merge_stripe_data
type: io.kestra.plugin.scripts.python.Script
description: "Merge and validate Stripe data"
inputFiles:
{% for data_type in inputs.data_types %}
stripe_{{ data_type }}.json: "{{ outputs.extract_data_types.outputs['extract_' + data_type].uri }}"
{% endfor %}
script: |
import json
import pandas as pd
from datetime import datetime
# Load all extracted data
all_data = {}
for data_type in {{ inputs.data_types | tojson }}:
with open(f'stripe_{data_type}.json', 'r') as f:
data = json.load(f)
all_data[data_type] = data['data']
# Validate data
validation_results = {
'extraction_time': datetime.now().isoformat(),
'total_records': sum(len(data) for data in all_data.values()),
'record_counts': {k: len(v) for k, v in all_data.items()},
'date_range': {
'start': '{{ inputs.start_date }}',
'end': '{{ inputs.end_date }}'
}
}
# Check for missing required fields
required_fields = {
'charges': ['id', 'amount', 'currency', 'customer'],
'customers': ['id', 'email', 'created'],
'refunds': ['id', 'charge', 'amount']
}
validation_results['missing_fields'] = {}
for data_type, records in all_data.items():
if data_type in required_fields:
for record in records[:10]: # Sample check
missing = [field for field in required_fields[data_type]
if field not in record]
if missing:
validation_results['missing_fields'].setdefault(data_type, []).append(missing)
# Save merged data
merged_data = {}
for data_type, records in all_data.items():
df = pd.DataFrame(records)
df.to_parquet(f'stripe_{data_type}_merged.parquet', index=False)
merged_data[data_type] = f'stripe_{data_type}_merged.parquet'
# Save validation results
with open('validation_report.json', 'w') as f:
json.dump(validation_results, f, indent=2)
# Output to Kestra
print(json.dumps({
'validation': validation_results,
'files': merged_data
}))
taskRunner:
type: io.kestra.plugin.core.runner.Process
memory: 2Gi
cpu: 1
image: python:3.11-slim
dependsOn:
- extract_data_types
# Task 5: Send extraction summary
- id: send_extraction_summary
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK_URL') }}"
payload: |
{
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "๐ฅ Stripe Data Extraction Complete"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Date Range:*\n{{ inputs.start_date }} to {{ inputs.end_date }}"
},
{
"type": "mrkdwn",
"text": "*Total Records:*\n{{ outputs.merge_stripe_data.vars.validation.total_records }}"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "{% for data_type, count in outputs.merge_stripe_data.vars.validation.record_counts.items() %}โข *{{ data_type }}*: {{ count }} records\n{% endfor %}"
}
}
]
}
dependsOn:
- merge_stripe_data
triggers:
- id: daily_extraction
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 1 * * *" # 1 AM daily
timezone: UTC
inputs:
start_date: "{{ now() | dateAdd(-1, 'DAYS') | date('yyyy-MM-dd') }}"
end_date: "{{ now() | date('yyyy-MM-dd') }}"
timeout: PT1H
Source 2: Shopify E-commerce Data
# flows/extract/shopify-orders.yml
id: extract-shopify-orders
namespace: acme.sales.etl
description: Extract order data from Shopify
tasks:
- id: get_order_ids
type: io.kestra.plugin.core.http.Request
uri: "https://{{ secret('SHOPIFY_STORE') }}.myshopify.com/admin/api/2024-01/orders.json"
method: GET
headers:
X-Shopify-Access-Token: "{{ secret('SHOPIFY_TOKEN') }}"
queryParams:
status: any
created_at_min: "{{ execution.startDate | dateAdd(-1, 'DAYS') | date('yyyy-MM-dd') }}T00:00:00Z"
limit: 250
store: true
pagination:
type: header_link
maxPage: 100
- id: enrich_orders
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ outputs.get_order_ids.body.orders | map('id') | list }}"
maxParallel: 5
tasks:
- id: "get_order_{{ task.value }}"
type: io.kestra.plugin.core.http.Request
uri: "https://{{ secret('SHOPIFY_STORE') }}.myshopify.com/admin/api/2024-01/orders/{{ task.value }}.json"
method: GET
headers:
X-Shopify-Access-Token: "{{ secret('SHOPIFY_TOKEN') }}"
queryParams:
fields: "id,email,created_at,financial_status,total_price,line_items,customer"
store: true
outputFile: "shopify_order_{{ task.value }}.json"
Source 3: Internal PostgreSQL Database
# flows/extract/internal-sales.yml
id: extract-internal-sales
namespace: acme.sales.etl
description: Extract sales data from internal database
tasks:
- id: extract_sales_transactions
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://{{ secret('DB_HOST') }}:5432/{{ secret('DB_NAME') }}"
username: "{{ secret('DB_USER') }}"
password: "{{ secret('DB_PASSWORD') }}"
sql: |
WITH daily_sales AS (
SELECT
t.transaction_id,
t.transaction_date,
c.customer_id,
c.customer_name,
p.product_id,
p.product_name,
t.quantity,
t.unit_price,
t.total_amount,
t.sales_rep_id,
sr.region,
'internal' as source_system,
CURRENT_TIMESTAMP as extracted_at
FROM transactions t
JOIN customers c ON t.customer_id = c.customer_id
JOIN products p ON t.product_id = p.product_id
JOIN sales_reps sr ON t.sales_rep_id = sr.rep_id
WHERE t.transaction_date >= '{{ execution.startDate | dateAdd(-1, 'DAYS') | date('yyyy-MM-dd') }}'
AND t.transaction_date < '{{ execution.startDate | date('yyyy-MM-dd') }}'
AND t.status = 'completed'
)
SELECT * FROM daily_sales
fetch: true
fetchSize: 10000
chunkSize: 5000
- id: save_to_parquet
type: io.kestra.plugin.scripts.python.Script
inputFiles:
sales_data.json: |
{{ outputs.extract_sales_transactions.rows | tojson }}
script: |
import pandas as pd
import json
data = json.loads('sales_data.json')
df = pd.DataFrame(data)
# Convert date columns
date_columns = ['transaction_date', 'extracted_at']
for col in date_columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col])
# Save to parquet
df.to_parquet('internal_sales.parquet', index=False)
print(f"Saved {len(df)} records to internal_sales.parquet")
dependsOn:
- extract_sales_transactions
Source 4: Google Sheets (Manual Input)
# flows/extract/google-sheets.yml
id: extract-google-sheets
namespace: acme.sales.etl
description: Extract manual inputs from Google Sheets
tasks:
- id: authenticate_google
type: io.kestra.plugin.gcp.Oauth
scopes:
- https://www.googleapis.com/auth/spreadsheets.readonly
credentials: "{{ secret('GOOGLE_CREDENTIALS_JSON') }}"
- id: extract_sheet_data
type: io.kestra.plugin.gcp.sheets.Read
spreadsheetId: "{{ secret('GOOGLE_SHEET_ID') }}"
range: "SalesData!A:Z"
store: true
- id: convert_to_structured
type: io.kestra.plugin.scripts.python.Script
inputFiles:
sheet_data.json: "{{ outputs.extract_sheet_data.uri }}"
script: |
import pandas as pd
import json
with open('sheet_data.json', 'r') as f:
data = json.load(f)
# Assuming first row is headers
rows = data['values']
if len(rows) < 2:
raise ValueError("No data in sheet")
headers = rows[0]
data_rows = rows[1:]
df = pd.DataFrame(data_rows, columns=headers)
# Clean up data
df = df.dropna(how='all') # Remove empty rows
# Standardize column names
df.columns = df.columns.str.lower().str.replace(' ', '_')
# Save
df.to_parquet('manual_sales.parquet', index=False)
print(f"Extracted {len(df)} manual entries")
Phase 4: Building the Transform Layer
Transformation Pipeline Architecture
# flows/transform/master-transformer.yml
id: master-transformer
namespace: acme.sales.etl
description: Orchestrate all data transformations
tasks:
# Parallel transformation of all sources
- id: transform_all_sources
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: transform_stripe
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: transform-stripe-payments
wait: true
transmit:
- name: input_files
value: "{{ parent.outputs.extract_stripe_payments.outputs.merge_stripe_data.vars.files }}"
- id: transform_shopify
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: transform-shopify-orders
wait: true
- id: transform_internal
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: transform-internal-sales
wait: true
- id: transform_manual
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: transform-google-sheets
wait: true
# Deduplicate across sources
- id: deduplicate_records
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: deduplicate-sales
wait: true
transmit:
- name: stripe_data
value: "{{ outputs.transform_all_sources.outputs.transform_stripe.outputs.transformed_files }}"
- name: shopify_data
value: "{{ outputs.transform_all_sources.outputs.transform_shopify.outputs.transformed_files }}"
- name: internal_data
value: "{{ outputs.transform_all_sources.outputs.transform_internal.outputs.transformed_files }}"
- name: manual_data
value: "{{ outputs.transform_all_sources.outputs.transform_manual.outputs.transformed_files }}"
# Enrich with customer data
- id: enrich_customer_data
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: enrich-customer-dimension
wait: true
transmit:
- name: sales_data
value: "{{ outputs.deduplicate_records.outputs.deduplicated_file }}"
# Calculate metrics
- id: calculate_metrics
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: calculate-sales-metrics
wait: true
transmit:
- name: enriched_data
value: "{{ outputs.enrich_customer_data.outputs.enriched_file }}"
# Generate data quality report
- id: data_quality_check
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.data.quality
flowId: run-data-quality-checks
wait: true
transmit:
- name: transformed_data
value: "{{ outputs.calculate_metrics.outputs.final_data }}"
- name: validation_rules
value: |
{
"rules": [
{"field": "total_amount", "type": "not_null"},
{"field": "customer_id", "type": "not_null"},
{"field": "transaction_date", "type": "not_null"},
{"field": "total_amount", "type": "positive"},
{"field": "quantity", "type": "positive_integer"}
]
}
Stripe Transformation Flow
# flows/transform/transform-stripe-payments.yml
id: transform-stripe-payments
namespace: acme.sales.etl
description: Transform raw Stripe data into standardized format
inputs:
- name: input_files
type: JSON
description: "Map of data type to file path"
- name: transformation_date
type: DATETIME
defaults: "{{ now() }}"
tasks:
- id: load_and_transform
type: io.kestra.plugin.scripts.python.Script
description: "Transform Stripe payments data"
inputFiles:
{% for data_type, file_path in inputs.input_files.items() %}
{{ data_type }}.parquet: "{{ file_path }}"
{% endfor %}
transformation_rules.py: |
# Custom transformation rules
def transform_stripe_charge(charge):
"""Transform a Stripe charge record"""
return {
'sale_id': f"stripe_{charge['id']}",
'transaction_date': charge['created'],
'customer_id': charge.get('customer'),
'product_id': 'stripe_payment',
'quantity': 1,
'unit_price': charge['amount'] / 100, # Convert from cents
'total_amount': charge['amount'] / 100,
'payment_method': charge.get('payment_method_details', {}).get('type', 'unknown'),
'region': charge.get('billing_details', {}).get('address', {}).get('country'),
'sales_rep': 'stripe',
'source_system': 'stripe',
'metadata': {
'currency': charge.get('currency'),
'description': charge.get('description'),
'status': charge.get('status')
}
}
def transform_stripe_customer(customer):
"""Transform a Stripe customer record"""
return {
'customer_id': customer['id'],
'customer_name': customer.get('name'),
'email': customer.get('email'),
'join_date': customer.get('created'),
'customer_tier': 'standard',
'lifetime_value': customer.get('metadata', {}).get('lifetime_value', 0),
'metadata': {
'description': customer.get('description'),
'phone': customer.get('phone'),
'address': customer.get('address')
}
}
script: |
import pandas as pd
import numpy as np
from datetime import datetime
import json
from pathlib import Path
# Import transformation rules
import sys
sys.path.append('.')
from transformation_rules import transform_stripe_charge, transform_stripe_customer
# Load data
charges_df = pd.read_parquet('charges.parquet')
customers_df = pd.read_parquet('customers.parquet')
refunds_df = pd.read_parquet('refunds.parquet')
# Transform charges
print(f"Transforming {len(charges_df)} charges...")
transformed_charges = []
for _, charge in charges_df.iterrows():
try:
transformed = transform_stripe_charge(charge.to_dict())
transformed_charges.append(transformed)
except Exception as e:
print(f"Error transforming charge {charge.get('id')}: {e}")
# Transform customers
print(f"Transforming {len(customers_df)} customers...")
transformed_customers = []
for _, customer in customers_df.iterrows():
try:
transformed = transform_stripe_customer(customer.to_dict())
transformed_customers.append(transformed)
except Exception as e:
print(f"Error transforming customer {customer.get('id')}: {e}")
# Handle refunds (adjust charges)
if len(refunds_df) > 0:
print(f"Processing {len(refunds_df)} refunds...")
refunds_by_charge = refunds_df.groupby('charge')
# This would adjust the charges data
# Implementation depends on business rules
# Convert to DataFrames
sales_df = pd.DataFrame(transformed_charges)
customers_df = pd.DataFrame(transformed_customers)
# Data validation
validation_errors = []
# Check for nulls in required fields
required_sales_fields = ['sale_id', 'transaction_date', 'total_amount']
for field in required_sales_fields:
null_count = sales_df[field].isnull().sum()
if null_count > 0:
validation_errors.append(f"Sales {field}: {null_count} null values")
# Check for negative amounts
negative_amounts = (sales_df['total_amount'] < 0).sum()
if negative_amounts > 0:
validation_errors.append(f"Sales: {negative_amounts} negative amounts")
# Check for future dates
today = pd.Timestamp.now()
future_dates = (pd.to_datetime(sales_df['transaction_date']) > today).sum()
if future_dates > 0:
validation_errors.append(f"Sales: {future_dates} future dates")
# Save validation report
validation_report = {
'transformation_date': datetime.now().isoformat(),
'records_processed': {
'sales': len(sales_df),
'customers': len(customers_df)
},
'validation_errors': validation_errors,
'error_rate': len(validation_errors) / (len(sales_df) + len(customers_df)) if (len(sales_df) + len(customers_df)) > 0 else 0
}
with open('validation_report.json', 'w') as f:
json.dump(validation_report, f, indent=2)
# Save transformed data
sales_df.to_parquet('stripe_sales_transformed.parquet', index=False)
customers_df.to_parquet('stripe_customers_transformed.parquet', index=False)
# Output to Kestra
output = {
'transformed_files': {
'sales': 'stripe_sales_transformed.parquet',
'customers': 'stripe_customers_transformed.parquet'
},
'validation': validation_report,
'summary': {
'total_sales': len(sales_df),
'total_customers': len(customers_df),
'total_revenue': sales_df['total_amount'].sum() if len(sales_df) > 0 else 0
}
}
print(json.dumps(output))
taskRunner:
type: io.kestra.plugin.core.runner.Process
memory: 4Gi
cpu: 2
image: python:3.11-slim
- id: handle_validation_errors
type: io.kestra.plugin.core.flow.Switch
value: "{{ outputs.load_and_transform.vars.validation.error_rate }}"
cases:
0:
- id: log_success
type: io.kestra.plugin.core.log.Log
message: "Stripe transformation successful with 0 validation errors"
0.01:
- id: log_warning
type: io.kestra.plugin.core.log.Log
message: "Stripe transformation completed with {{ (outputs.load_and_transform.vars.validation.error_rate * 100) | round(2) }}% error rate"
default:
- id: alert_high_error_rate
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK_URL') }}"
payload: |
{
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "โ ๏ธ High Error Rate in Stripe Transformation"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "Error rate: *{{ (outputs.load_and_transform.vars.validation.error_rate * 100) | round(2) }}%*\n\nValidation errors:\n{% for error in outputs.load_and_transform.vars.validation.validation_errors %}- {{ error }}\n{% endfor %}"
}
}
]
}
Data Quality Transformation
# flows/transform/data-quality-checks.yml
id: run-data-quality-checks
namespace: acme.data.quality
description: Run comprehensive data quality checks
inputs:
- name: transformed_data
type: STRING
description: "Path to transformed data file"
- name: validation_rules
type: JSON
description: "Data quality validation rules"
tasks:
- id: run_great_expectations
type: io.kestra.plugin.greatexpectations.Validation
expectationSuite: |
{
"expectation_suite_name": "sales_data_suite",
"expectations": [
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "sale_id"
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "total_amount",
"min_value": 0,
"max_value": 1000000
}
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "source_system",
"value_set": ["stripe", "shopify", "internal", "manual"]
}
}
]
}
data:
type: file
path: "{{ inputs.transformed_data }}"
- id: generate_data_profiling
type: io.kestra.plugin.scripts.python.Script
inputFiles:
data.parquet: "{{ inputs.transformed_data }}"
script: |
import pandas as pd
import json
from ydata_profiling import ProfileReport
# Load data
df = pd.read_parquet('data.parquet')
# Generate profile
profile = ProfileReport(df, title="Sales Data Profile")
profile.to_file("data_profile.html")
# Calculate statistics
stats = {
'row_count': len(df),
'column_count': len(df.columns),
'missing_values': df.isnull().sum().to_dict(),
'data_types': {col: str(dtype) for col, dtype in df.dtypes.items()},
'numeric_stats': {},
'categorical_stats': {}
}
# Numeric columns
numeric_cols = df.select_dtypes(include=['number']).columns
for col in numeric_cols:
stats['numeric_stats'][col] = {
'mean': float(df[col].mean()),
'std': float(df[col].std()),
'min': float(df[col].min()),
'max': float(df[col].max()),
'median': float(df[col].median())
}
# Categorical columns
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
stats['categorical_stats'][col] = {
'unique_count': int(df[col].nunique()),
'top_value': df[col].mode().iloc[0] if not df[col].mode().empty else None,
'top_frequency': int(df[col].value_counts().iloc[0]) if not df[col].value_counts().empty else 0
}
# Save stats
with open('data_statistics.json', 'w') as f:
json.dump(stats, f, indent=2)
print(json.dumps({'profile_file': 'data_profile.html', 'statistics': stats}))
Phase 5: Building the Load Layer
Data Warehouse Loading Strategy
# flows/load/load-to-warehouse.yml
id: load-to-warehouse
namespace: acme.sales.etl
description: Load transformed data to data warehouse
variables:
warehouse_type: snowflake # or bigquery, redshift, postgresql
load_strategy: incremental # or full, upsert
batch_id: "{{ execution.id }}"
inputs:
- name: sales_data_path
type: STRING
description: "Path to transformed sales data"
- name: customer_data_path
type: STRING
description: "Path to transformed customer data"
- name: load_date
type: DATETIME
defaults: "{{ now() }}"
tasks:
# Task 1: Create staging tables
- id: create_staging_tables
type: io.kestra.plugin.jdbc.snowflake.Query
sql: |
CREATE OR REPLACE TEMPORARY TABLE staging_sales (
sale_id VARCHAR(100),
transaction_date DATE,
customer_id VARCHAR(50),
product_id VARCHAR(50),
quantity INTEGER,
unit_price DECIMAL(10,2),
total_amount DECIMAL(10,2),
payment_method VARCHAR(50),
region VARCHAR(100),
sales_rep VARCHAR(100),
source_system VARCHAR(50),
loaded_at TIMESTAMP,
batch_id VARCHAR(100)
);
CREATE OR REPLACE TEMPORARY TABLE staging_customers (
customer_id VARCHAR(50),
customer_name VARCHAR(255),
email VARCHAR(255),
join_date DATE,
customer_tier VARCHAR(50),
lifetime_value DECIMAL(10,2),
last_updated TIMESTAMP,
batch_id VARCHAR(100)
);
role: "{{ secret('SNOWFLAKE_ROLE') }}"
warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"
# Task 2: Load data to staging (parallel)
- id: load_staging_data
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: load_sales_staging
type: io.kestra.plugin.jdbc.snowflake.Load
from: "{{ inputs.sales_data_path }}"
table: staging_sales
schema: TEMPORARY
fileFormat: "(TYPE = PARQUET)"
copyOptions:
- ON_ERROR = CONTINUE
role: "{{ secret('SNOWFLAKE_ROLE') }}"
warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"
- id: load_customers_staging
type: io.kestra.plugin.jdbc.snowflake.Load
from: "{{ inputs.customer_data_path }}"
table: staging_customers
schema: TEMPORARY
fileFormat: "(TYPE = PARQUET)"
copyOptions:
- ON_ERROR = CONTINUE
role: "{{ secret('SNOWFLAKE_ROLE') }}"
warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"
# Task 3: Validate staging data
- id: validate_staging_data
type: io.kestra.plugin.jdbc.snowflake.Query
fetch: true
sql: |
WITH staging_stats AS (
SELECT
COUNT(*) as staging_count,
COUNT(DISTINCT sale_id) as unique_sales,
SUM(CASE WHEN sale_id IS NULL THEN 1 ELSE 0 END) as null_sales_ids,
SUM(CASE WHEN total_amount < 0 THEN 1 ELSE 0 END) as negative_amounts
FROM staging_sales
),
existing_stats AS (
SELECT COUNT(*) as existing_count
FROM sales_fact
WHERE DATE(loaded_at) = '{{ inputs.load_date | date('yyyy-MM-dd') }}'
)
SELECT
s.staging_count,
s.unique_sales,
s.null_sales_ids,
s.negative_amounts,
e.existing_count,
CASE
WHEN s.null_sales_ids > 0 THEN 'FAIL'
WHEN s.negative_amounts > s.staging_count * 0.01 THEN 'WARN'
ELSE 'PASS'
END as validation_status
FROM staging_stats s
CROSS JOIN existing_stats e
role: "{{ secret('SNOWFLAKE_ROLE') }}"
warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"
dependsOn:
- load_staging_data
# Task 4: Merge to production (incremental load)
- id: merge_to_production
type: io.kestra.plugin.jdbc.snowflake.Query
sql: |
-- Merge sales data
MERGE INTO sales_fact AS target
USING staging_sales AS source
ON target.sale_id = source.sale_id
AND target.transaction_date = source.transaction_date
WHEN MATCHED THEN
UPDATE SET
customer_id = source.customer_id,
product_id = source.product_id,
quantity = source.quantity,
unit_price = source.unit_price,
total_amount = source.total_amount,
payment_method = source.payment_method,
region = source.region,
sales_rep = source.sales_rep,
source_system = source.source_system,
loaded_at = source.loaded_at,
batch_id = source.batch_id
WHEN NOT MATCHED THEN
INSERT (
sale_id, transaction_date, customer_id, product_id,
quantity, unit_price, total_amount, payment_method,
region, sales_rep, source_system, loaded_at, batch_id
)
VALUES (
source.sale_id, source.transaction_date, source.customer_id, source.product_id,
source.quantity, source.unit_price, source.total_amount, source.payment_method,
source.region, source.sales_rep, source.source_system, source.loaded_at, source.batch_id
);
-- Merge customer data
MERGE INTO customer_dim AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND target.last_updated < source.last_updated THEN
UPDATE SET
customer_name = source.customer_name,
email = source.email,
join_date = source.join_date,
customer_tier = source.customer_tier,
lifetime_value = source.lifetime_value,
last_updated = source.last_updated
WHEN NOT MATCHED THEN
INSERT (
customer_id, customer_name, email, join_date,
customer_tier, lifetime_value, last_updated
)
VALUES (
source.customer_id, source.customer_name, source.email, source.join_date,
source.customer_tier, source.lifetime_value, source.last_updated
);
role: "{{ secret('SNOWFLAKE_ROLE') }}"
warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"
dependsOn:
- validate_staging_data
# Task 5: Generate load statistics
- id: generate_load_stats
type: io.kestra.plugin.jdbc.snowflake.Query
fetch: true
sql: |
SELECT
'sales_fact' as table_name,
COUNT(*) as total_rows,
COUNT(DISTINCT sale_id) as unique_sales,
SUM(total_amount) as total_revenue,
MIN(transaction_date) as earliest_date,
MAX(transaction_date) as latest_date
FROM sales_fact
WHERE batch_id = '{{ vars.batch_id }}'
UNION ALL
SELECT
'customer_dim' as table_name,
COUNT(*) as total_customers,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(lifetime_value) as total_lifetime_value,
MIN(join_date) as earliest_join,
MAX(join_date) as latest_join
FROM customer_dim
WHERE last_updated >= '{{ inputs.load_date | date('yyyy-MM-dd') }}'
role: "{{ secret('SNOWFLAKE_ROLE') }}"
warehouse: "{{ secret('SNOWFLAKE_WAREHOUSE') }}"
dependsOn:
- merge_to_production
# Task 6: Send load completion notification
- id: send_load_notification
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK_URL') }}"
payload: |
{
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "โ
ETL Load Complete"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Load Date:*\n{{ inputs.load_date | date('yyyy-MM-dd HH:mm') }}"
},
{
"type": "mrkdwn",
"text": "*Batch ID:*\n{{ vars.batch_id }}"
}
]
},
{% for row in outputs.generate_load_stats.rows %}
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*{{ row.table_name }}*\nRows: {{ row.total_rows }}\n{% if row.table_name == 'sales_fact' %}Revenue: ${{ row.total_revenue | round(2) }}{% else %}LTV: ${{ row.total_lifetime_value | round(2) }}{% endif %}"
}
},
{% endfor %}
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": "Execution: {{ execution.id }}"
}
]
}
]
}
dependsOn:
- generate_load_stats
Alternative: BigQuery Loading
# flows/load/load-to-bigquery.yml
id: load-to-bigquery
namespace: acme.sales.etl
tasks:
- id: load_sales_to_bigquery
type: io.kestra.plugin.gcp.bigquery.Load
from: "{{ inputs.sales_data_path }}"
destinationTable:
projectId: "{{ secret('GCP_PROJECT_ID') }}"
datasetId: sales_data
tableId: sales_fact
format: PARQUET
writeDisposition: WRITE_APPEND
createDisposition: CREATE_IF_NEEDED
- id: run_dbt_transformation
type: io.kestra.plugin.dbt.cli.Run
projectDir: "s3://acme-dbt-project/"
profilesDir: "s3://acme-dbt-profiles/"
target: prod
select:
- sales_fact+
- customer_dim+
Phase 6: Orchestrating the Complete Pipeline
Master Orchestrator Flow
# flows/orchestrator/daily-sales-etl.yml
id: daily-sales-etl
namespace: acme.sales
revision: 1
description: |
# Daily Sales ETL Pipeline
Complete end-to-end pipeline for sales data.
Extracts from 4 sources, transforms, validates, and loads to warehouse.
## SLA: 4 hours
## Schedule: Daily at 1 AM UTC
## Owner: Data Engineering Team
labels:
pipeline: sales-etl
tier: 1
sla: 4-hours
owner: data-engineering
variables:
pipeline_version: "1.0"
notification_channels:
- slack: "#data-alerts"
- email: "data-eng@acme.com"
retry_policy:
max_attempts: 3
delay_minutes: 5
inputs:
- name: execution_date
type: DATETIME
defaults: "{{ now() | date('yyyy-MM-dd') }}"
- name: reprocess_full_day
type: BOOLEAN
defaults: false
tasks:
# ========== PHASE 1: SETUP ==========
- id: initialize_pipeline
type: io.kestra.plugin.core.flow.Sequential
description: "Pipeline initialization and validation"
tasks:
- id: validate_execution_date
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ inputs.execution_date | date('yyyy-MM-dd') <=
now() | date('yyyy-MM-dd') }}
errorMessage: "Execution date cannot be in the future"
- id: check_previous_executions
type: io.kestra.plugin.core.flow.Previous
namespace: "{{ namespace.id }}"
flowId: "{{ flow.id }}"
successOnly: true
- id: should_execute
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ inputs.reprocess_full_day }} or
{{ outputs.check_previous_executions.executions | length == 0 }} or
{{ outputs.check_previous_executions.executions[0].state.current != 'SUCCESS' }}
- id: log_pipeline_start
type: io.kestra.plugin.core.log.Log
message: |
๐ Starting Daily Sales ETL Pipeline
Date: {{ inputs.execution_date }}
Pipeline Version: {{ vars.pipeline_version }}
Execution ID: {{ execution.id }}
# ========== PHASE 2: EXTRACTION ==========
- id: extract_all_sources
type: io.kestra.plugin.core.flow.Parallel
description: "Extract data from all sources in parallel"
tasks:
- id: extract_stripe
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: extract-stripe-payments
wait: true
transmit:
- name: start_date
value: "{{ inputs.execution_date }}"
- name: end_date
value: "{{ inputs.execution_date }}"
- id: extract_shopify
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: extract-shopify-orders
wait: true
- id: extract_internal
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: extract-internal-sales
wait: true
- id: extract_manual
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: extract-google-sheets
wait: true
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.initialize_pipeline.outputs.should_execute.value }}"
# ========== PHASE 3: TRANSFORMATION ==========
- id: transform_and_validate
type: io.kestra.plugin.core.flow.Sequential
description: "Transform and validate extracted data"
tasks:
- id: run_transformations
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: master-transformer
wait: true
- id: check_transformation_quality
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.run_transformations.outputs.data_quality_check.outputs.quality_score | default(100) }} >= 95
errorMessage: "Data quality score below 95%"
dependsOn:
- extract_all_sources
# ========== PHASE 4: LOADING ==========
- id: load_to_data_warehouse
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: load-to-warehouse
wait: true
transmit:
- name: sales_data_path
value: "{{ outputs.transform_and_validate.outputs.run_transformations.outputs.calculate_metrics.outputs.final_data }}"
- name: customer_data_path
value: "{{ outputs.transform_and_validate.outputs.run_transformations.outputs.enrich_customer_data.outputs.customer_file }}"
- name: load_date
value: "{{ inputs.execution_date }}"
dependsOn:
- transform_and_validate
# ========== PHASE 5: POST-PROCESSING ==========
- id: post_processing
type: io.kestra.plugin.core.flow.Parallel
description: "Post-load processing and reporting"
tasks:
- id: generate_daily_report
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.reporting
flowId: generate-daily-sales-report
wait: true
transmit:
- name: report_date
value: "{{ inputs.execution_date }}"
- id: update_data_catalog
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.data.governance
flowId: update-data-catalog
wait: true
- id: archive_raw_data
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.data.storage
flowId: archive-raw-data
wait: true
dependsOn:
- load_to_data_warehouse
# ========== PHASE 6: COMPLETION ==========
- id: pipeline_completion
type: io.kestra.plugin.core.flow.Sequential
description: "Pipeline completion and notifications"
tasks:
- id: calculate_pipeline_metrics
type: io.kestra.plugin.core.debug.Return
format: >
{
"start_time": "{{ execution.startDate }}",
"end_time": "{{ now() }}",
"duration_seconds": {{ (now() - execution.startDate).totalSeconds() | round(2) }},
"extraction_records": {{ outputs.extract_all_sources.outputs.extract_stripe.outputs.merge_stripe_data.vars.validation.total_records | default(0) }},
"loaded_records": {{ outputs.load_to_data_warehouse.outputs.generate_load_stats.rows[0].total_rows | default(0) }},
"success_rate": 100
}
- id: send_success_notification
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK_URL') }}"
payload: |
{
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "๐ Daily Sales ETL Complete"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Date:*\n{{ inputs.execution_date }}"
},
{
"type": "mrkdwn",
"text": "*Duration:*\n{{ outputs.calculate_pipeline_metrics.value.duration_seconds | round(2) }} seconds"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Records Processed*\nExtracted: {{ outputs.calculate_pipeline_metrics.value.extraction_records }}\nLoaded: {{ outputs.calculate_pipeline_metrics.value.loaded_records }}"
}
}
]
}
- id: update_monitoring_dashboard
type: io.kestra.plugin.core.http.Request
uri: "{{ secret('MONITORING_API_URL') }}/api/v1/metrics"
method: POST
headers:
Authorization: "Bearer {{ secret('MONITORING_API_KEY') }}"
body: |
{
"metric": "etl_pipeline_duration",
"value": {{ outputs.calculate_pipeline_metrics.value.duration_seconds }},
"tags": {
"pipeline": "daily-sales-etl",
"date": "{{ inputs.execution_date }}"
}
}
dependsOn:
- post_processing
# ========== ERROR HANDLING ==========
- id: handle_pipeline_failure
type: io.kestra.plugin.core.flow.Sequential
description: "Handle pipeline failures"
tasks:
- id: send_failure_alert
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_CRITICAL_WEBHOOK_URL') }}"
payload: |
{
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "๐จ ETL Pipeline Failure"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Pipeline:* Daily Sales ETL\n*Date:* {{ inputs.execution_date }}\n*Error:* {{ execution.state.current }}\n*Execution:* {{ execution.id }}"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "View Details"
},
"url": "{{ serverUrl }}/ui/executions/{{ execution.id }}"
}
]
}
]
}
- id: trigger_backup_pipeline
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.backup
flowId: run-backup-sales-pipeline
wait: false # Fire and forget
conditions:
- type: io.kestra.plugin.core.condition.Execution
states: [FAILED, KILLED, WARNING]
triggers:
- id: daily_schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 1 * * *" # 1 AM UTC daily
timezone: UTC
inputs:
execution_date: "{{ now() | date('yyyy-MM-dd') }}"
- id: manual_trigger
type: io.kestra.plugin.core.trigger.Webhook
key: daily-sales-etl
inputs:
execution_date: "{{ trigger.date | default(now() | date('yyyy-MM-dd')) }}"
reprocess_full_day: "{{ trigger.reprocess | default(false) }}"
timeout: PT4H # 4 hour timeout
listeners:
- conditions:
- type: io.kestra.plugin.core.condition.Execution
states: [FAILED]
tasks:
- id: emergency_cleanup
type: io.kestra.plugin.core.flow.Subflow
namespace: system.emergency
flowId: cleanup-failed-pipeline
transmit:
- name: execution_id
value: "{{ execution.id }}"
Phase 7: Testing and Monitoring
Unit Tests for Transformation Logic
# tests/unit/test_transformations.py
import pytest
import pandas as pd
from datetime import datetime
import sys
sys.path.append('../scripts')
from transformations import transform_stripe_charge, validate_sales_data
def test_transform_stripe_charge():
"""Test Stripe charge transformation"""
sample_charge = {
'id': 'ch_123',
'created': 1672531200, # 2023-01-01
'customer': 'cus_456',
'amount': 1000,
'currency': 'usd',
'payment_method_details': {'type': 'card'},
'billing_details': {'address': {'country': 'US'}}
}
result = transform_stripe_charge(sample_charge)
assert result['sale_id'] == 'stripe_ch_123'
assert result['total_amount'] == 10.0 # Converted from cents
assert result['region'] == 'US'
assert result['source_system'] == 'stripe'
def test_validate_sales_data():
"""Test sales data validation"""
# Valid data
valid_df = pd.DataFrame({
'sale_id': ['1', '2'],
'total_amount': [100.0, 200.0],
'quantity': [1, 2]
})
assert validate_sales_data(valid_df) == []
# Invalid data
invalid_df = pd.DataFrame({
'sale_id': ['1', None],
'total_amount': [-100.0, 200.0],
'quantity': [1, -1]
})
errors = validate_sales_data(invalid_df)
assert len(errors) == 3 # Null sale_id, negative amount, negative quantity
def test_integration_flow():
"""Test complete flow execution"""
# This would use Kestra's test framework
# to execute flows with mock data
pass
Integration Test Flow
# tests/integration/test-daily-etl.yml
id: test-daily-etl
namespace: acme.tests
description: Integration test for daily ETL pipeline
tasks:
- id: setup_test_data
type: io.kestra.plugin.scripts.python.Script
script: |
# Generate test data
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# Create test Stripe data
dates = pd.date_range(start='2024-01-01', end='2024-01-02', freq='H')
stripe_data = []
for i, date in enumerate(dates):
stripe_data.append({
'id': f'ch_test_{i}',
'created': int(date.timestamp()),
'customer': f'cus_test_{i % 100}',
'amount': np.random.randint(100, 10000),
'currency': 'usd',
'status': 'succeeded'
})
df = pd.DataFrame(stripe_data)
df.to_parquet('test_stripe_data.parquet', index=False)
- id: run_etl_with_test_data
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sales.etl
flowId: daily-sales-etl
wait: true
transmit:
- name: execution_date
value: "2024-01-01"
# Override extract task to use test data
- id: validate_results
type: io.kestra.plugin.jdbc.snowflake.Query
fetch: true
sql: |
SELECT
COUNT(*) as loaded_count,
SUM(total_amount) as total_revenue
FROM sales_fact
WHERE batch_id = '{{ outputs.run_etl_with_test_data.executionId }}'
- id: assert_test_passed
type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.validate_results.rows[0].loaded_count > 0 }}"
errorMessage: "Test failed: No data loaded"
Monitoring Dashboard
# flows/monitoring/etl-dashboard.yml
id: etl-monitoring-dashboard
namespace: acme.monitoring
description: Generate ETL monitoring dashboard
tasks:
- id: collect_metrics
type: io.kestra.plugin.jdbc.snowflake.Query
fetch: true
sql: |
WITH pipeline_stats AS (
SELECT
DATE(loaded_at) as load_date,
COUNT(*) as records_loaded,
SUM(total_amount) as daily_revenue,
COUNT(DISTINCT source_system) as sources_loaded
FROM sales_fact
WHERE loaded_at >= DATEADD(day, -30, CURRENT_DATE())
GROUP BY DATE(loaded_at)
),
error_stats AS (
SELECT
DATE(start_date) as error_date,
COUNT(*) as failed_executions,
STRING_AGG(flow_id, ', ') as failed_flows
FROM kestra.executions
WHERE state = 'FAILED'
AND start_date >= DATEADD(day, -30, CURRENT_DATE())
AND namespace LIKE 'acme.sales%'
GROUP BY DATE(start_date)
)
SELECT
COALESCE(p.load_date, e.error_date) as date,
p.records_loaded,
p.daily_revenue,
p.sources_loaded,
COALESCE(e.failed_executions, 0) as failed_executions,
e.failed_flows
FROM pipeline_stats p
FULL OUTER JOIN error_stats e ON p.load_date = e.error_date
ORDER BY date DESC
- id: generate_dashboard
type: io.kestra.plugin.scripts.python.Script
inputFiles:
metrics.json: |
{{ outputs.collect_metrics.rows | tojson }}
script: |
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import json
# Load metrics
data = json.loads('metrics.json')
df = pd.DataFrame(data)
# Create dashboard
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Records Loaded', 'Daily Revenue',
'Sources Loaded', 'Failed Executions')
)
# Records loaded
fig.add_trace(
go.Bar(x=df['date'], y=df['records_loaded'], name='Records'),
row=1, col=1
)
# Daily revenue
fig.add_trace(
go.Scatter(x=df['date'], y=df['daily_revenue'],
mode='lines+markers', name='Revenue'),
row=1, col=2
)
# Sources loaded
fig.add_trace(
go.Scatter(x=df['date'], y=df['sources_loaded'],
mode='lines+markers', name='Sources'),
row=2, col=1
)
# Failed executions
fig.add_trace(
go.Bar(x=df['date'], y=df['failed_executions'],
name='Failures', marker_color='red'),
row=2, col=2
)
# Update layout
fig.update_layout(
title='ETL Pipeline Dashboard - Last 30 Days',
showlegend=False,
height=800
)
# Save dashboard
fig.write_html('etl_dashboard.html')
print('Dashboard generated: etl_dashboard.html')
Phase 8: Deployment and Maintenance
CI/CD Pipeline for ETL Flows
# .github/workflows/deploy-etl.yml
name: Deploy ETL Flows
on:
push:
branches: [main]
paths:
- 'flows/**'
- 'scripts/**'
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate YAML syntax
run: |
pip install yamllint
yamllint -c .yamllint flows/
- name: Validate with Kestra
run: |
docker run --rm \
-v $(pwd)/flows:/flows \
kestra/kestra:latest \
flow validate /flows
test:
runs-on: ubuntu-latest
needs: validate
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Run integration tests
run: |
docker-compose -f docker-compose.test.yml up \
--abort-on-container-exit \
--exit-code-from tests
deploy:
runs-on: ubuntu-latest
needs: test
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Deploy to Development
run: |
curl -X POST "https://dev-kestra.acme.com/api/v1/flows/batch" \
-H "Authorization: Bearer ${{ secrets.KESTRA_DEV_TOKEN }}" \
-F "files=@flows/orchestrator/daily-sales-etl.yml"
- name: Deploy to Production
if: success()
run: |
curl -X POST "https://prod-kestra.acme.com/api/v1/flows/batch" \
-H "Authorization: Bearer ${{ secrets.KESTRA_PROD_TOKEN }}" \
-F "files=@flows/orchestrator/daily-sales-etl.yml"
Backup and Recovery Strategy
# flows/maintenance/backup-flows.yml
id: backup-flows
namespace: system.maintenance
description: Backup all flows and configurations
tasks:
- id: export_all_flows
type: io.kestra.plugin.core.flow.Export
namespace: ".*" # All namespaces
- id: compress_backup
type: io.kestra.plugin.scripts.shell.Commands
commands:
- tar -czf flows_backup_$(date +%Y%m%d_%H%M%S).tar.gz *.yml
- id: upload_to_s3
type: io.kestra.plugin.aws.s3.Upload
from: "flows_backup_*.tar.gz"
bucket: acme-kestra-backups
key: "backups/{{ execution.startDate | date('yyyy/MM/dd') }}/"
- id: cleanup_old_backups
type: io.kestra.plugin.aws.s3.Delete
bucket: acme-kestra-backups
regex: "backups/.*\.tar\.gz"
age: P30D # Delete backups older than 30 days
Conclusion: Your Production-Ready ETL Pipeline
Congratulations! You've just built a complete, production-grade ETL pipeline with Kestra. Let's review what you've accomplished:
โ What You Built:
Multi-source extraction from Stripe, Shopify, internal DB, and Google Sheets
Robust transformation with data quality checks and error handling
Incremental loading to Snowflake with merge operations
Comprehensive monitoring with alerts and dashboards
Testing framework for validation and regression testing
CI/CD pipeline for automated deployment
๐ฏ Key Success Factors:
Modular design: Each component is independent and testable
Error resilience: Graceful handling of API failures, data issues
Observability: Complete visibility into pipeline execution
Maintainability: Clear documentation and version control
Scalability: Parallel processing for large datasets
๐ Next Steps to Production:
Performance Tuning:
# Monitor and optimize kubectl top pods -n kestra # Check slow queries in SnowflakeSecurity Hardening:
Rotate API keys monthly
Enable audit logging
Implement network policies
Regular security scans
Cost Optimization:
Right-size compute resources
Implement data retention policies
Monitor cloud spend
Disaster Recovery:
# Regular backup testing - id: test_restore type: io.kestra.plugin.core.flow.Subflow namespace: system.dr flowId: test-backup-restore schedule: "0 0 * * 0" # Weekly
๐ Measuring Success:
Track these KPIs:
Pipeline reliability: >99.5% success rate
Data freshness: <4 hours from source to warehouse
Data quality: <1% error rate in transformations
Cost efficiency: <$0.01 per 1000 records processed
Team productivity: <1 hour to add new data source
๐ฎ Future Enhancements:
Real-time processing: Add streaming with Kafka
Machine learning: Integrate model training pipelines
Data governance: Implement data lineage and catalog
Self-service: Build UI for business users to trigger flows
Cross-region replication: For global redundancy
Your Challenge:
Now it's your turn! Pick one improvement to implement:
Add a new data source (e.g., Salesforce, HubSpot)
Implement data masking for PII in test environments
Build a real-time alert for revenue anomalies
Create a self-healing pipeline that auto-retries with different strategies
Add predictive scaling based on data volume patterns
Share what you build in the comments below!
Remember: The best ETL pipeline isn't the most complexโit's the most reliable. Start simple, monitor everything, and iterate based on data.
In the next article, we'll dive into Advanced Workflow Patterns in Kestra, where we'll explore parallel processing, dynamic workflows, and machine learning pipelines.
Before the next article, try:
Deploy your ETL pipeline to a cloud environment
Set up monitoring alerts for your critical flows
Run a backfill for the last 7 days of data
Document your pipeline architecture for your team
Happy data engineering! ๐


