Article 3: Understanding Kestra's Architecture: Flows, Tasks, and Namespaces
Learner, Love to make things simple, Full Stack Developer, StackOverflower, Passionate about using machine learning, deep learning and AI
Building Blocks of Declarative Orchestration.
Introduction: The Power of Simplicity
Imagine trying to build a house without understanding bricks, beams, and blueprints. That's what using an orchestration tool without understanding its core concepts feels like. Today, we'll transform you from a casual user to an architect who understands every component of Kestra's structure.
By the end of this article, you'll be able to:
Design complex workflows with confidence
Choose the right task for every job
Organize your projects for maximum maintainability
Debug issues by understanding the underlying architecture
Optimize performance at the structural level
Core Concept 1: Namespaces - Your Organizational Foundation
What Are Namespaces?
Think of namespaces as virtual folders or projects in Kestra. They're not just organizational fluff—they're the backbone of security, access control, and workflow management.
# This isn't just a label, it's an organizational system
namespace: finance.europe.reports
Why Namespaces Matter More Than You Think
Real-world analogy: If Kestra were a company:
Namespace = Department (Finance, Marketing, Engineering)
Flow = Project within that department
Task = Individual task in the project
Execution = Project run
Namespace Hierarchy Best Practices
# BAD: Flat structure
namespace: sales_report_2024
# GOOD: Hierarchical structure
namespace: sales
# or better:
namespace: sales.region.europe.reports
# or even:
namespace: sales.region.europe.reports.monthly
Recommended Structure:
company.department.region.function.frequency
└── acme.finance.europe.reports.monthly
└── acme.marketing.us.campaigns.daily
└── acme.engineering.global.ci.cd.triggered
Practical Implementation
# namespace-structure.yml
id: create-namespaces
namespace: system.admin
description: Create organizational namespace structure
tasks:
- id: create-finance-namespace
type: io.kestra.plugin.core.namespace.CreateNamespace
namespace: acme.finance
description: "Finance department workflows"
variables:
department: finance
sla: business_hours
owner: finance-team@acme.com
- id: create-finance-subnamespaces
type: io.kestra.plugin.core.flow.EachSequential
value: ["reports", "reconciliation", "compliance", "forecasting"]
tasks:
- id: create-subnamespace-{{ task.value }}
type: io.kestra.plugin.core.namespace.CreateNamespace
namespace: "acme.finance.{{ task.value }}"
description: "Finance {{ task.value }} workflows"
- id: set-permissions
type: io.kestra.plugin.core.namespace.UpdateNamespace
namespace: acme.finance.reports
permissions:
- username: alice@acme.com
permission: EXECUTE
- username: bob@acme.com
permission: READ
- group: finance-managers
permission: OWNER
Namespace Configuration Options
# Advanced namespace configuration
id: configure-namespace
namespace: system.config
description: Configure namespace with advanced settings
tasks:
- id: create-namespace-with-advanced-config
type: io.kestra.plugin.core.namespace.CreateNamespace
namespace: acme.data.sensitive
description: "Namespace for sensitive data processing"
variables:
# Custom metadata
data_classification: PII
retention_days: 30
backup_frequency: daily
compliance: GDPR
# Task defaults for this namespace
taskDefaults:
- type: io.kestra.plugin.scripts.python.Script
values:
taskRunner:
type: io.kestra.plugin.core.runner.Process
memory: 1Gi
cpu: 500m
timeout: PT30M
# Flow defaults
flowDefaults:
triggers:
- type: io.kestra.plugin.core.trigger.Schedule
disabled: false
logging:
level: INFO
The Hidden Power: Namespace Variables
# Using namespace variables
id: use-namespace-vars
namespace: acme.finance.reports
description: Demonstrates namespace variables
inputs:
- name: report_date
type: DATETIME
defaults: "{{ now() }}"
tasks:
- id: log-config
type: io.kestra.plugin.core.log.Log
message: |
Namespace: {{ namespace.id }}
Variables: {{ namespace.variables | tojson }}
SLA: {{ namespace.variables.sla }}
Owner: {{ namespace.variables.owner }}
- id: process-report
type: io.kestra.plugin.core.debug.Return
format: >
Processing {{ namespace.variables.department }} report
for {{ inputs.report_date | date('yyyy-MM-dd') }}
with retention {{ namespace.variables.retention_days }} days
Common Namespace Anti-Patterns
Anti-Pattern 1: Too Many Namespaces
# Don't do this:
namespace: acme.sales.daily.report.january.2024.region.europe
# Result: Unmanageable hierarchy, permission nightmares
Anti-Pattern 2: No Structure
# Don't do this:
namespace: report_for_bob
namespace: alice_special_project
namespace: urgent_thing
# Result: Chaos, no discoverability
Anti-Pattern 3: Using Namespaces as Environments
# Instead of:
namespace: dev.sales.report
namespace: prod.sales.report
# Consider:
namespace: sales.report
# Use flow labels or variables for environment:
labels:
environment: dev
region: us-east-1
Core Concept 2: Flows - Your Workflow Blueprint
Anatomy of a Flow
A flow is a declarative blueprint for your workflow. Let's dissect one:
# flow-anatomy.yml
id: data-pipeline # Unique identifier within namespace
namespace: acme.data.engineering # Organizational container
revision: 3 # Version (auto-managed)
description: "ETL pipeline for customer data" # Human-readable description
labels: # Metadata for filtering/searching
team: data-engineering
priority: high
data-sensitive: true
cost-center: marketing
tasks: # The actual workflow steps
- id: extract
# Task configuration...
- id: transform
# Task configuration...
- id: load
# Task configuration...
triggers: # When to run automatically
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 2 * * *"
variables: # Flow-level variables
batch_size: 1000
target_table: customers_staging
timeout: PT1H # Maximum execution time
disabled: false # Enable/disable flow
Flow Lifecycle: From YAML to Execution
Understanding this lifecycle is crucial for debugging:
1. Definition (YAML)
↓
2. Parsing & Validation
↓
3. Storage (PostgreSQL)
↓
4. Trigger Detection
↓
5. Execution Instantiation
↓
6. Task Execution
↓
7. State Management
↓
8. Completion/Error Handling
Advanced Flow Patterns
Pattern 1: Modular Flows with Subflows
# main-flow.yml
id: main-orchestration
namespace: acme.data
tasks:
- id: call-subflow
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.data.utilities
flowId: data-validation
wait: true
transmit:
- name: input_file
value: "{{ outputs.download.uri }}"
receive:
- name: validation_report
- id: continue-processing
type: io.kestra.plugin.core.log.Log
message: "Validation result: {{ outputs.call-subflow.outputs.validation_report }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.call-subflow.state.current == 'SUCCESS' }}"
Pattern 2: Dynamic Flow Generation
# generate-flows.yml
id: flow-generator
namespace: acme.system
tasks:
- id: get-regions
type: io.kestra.plugin.jdbc.postgresql.Query
fetch: true
sql: "SELECT region_id, region_name FROM sales_regions"
- id: create-region-flows
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ outputs.get-regions.rows }}"
tasks:
- id: create-flow-{{ task.value.region_id }}
type: io.kestra.plugin.core.flow.Create
namespace: acme.sales.region
flow:
id: process_{{ task.value.region_name | lower }}
namespace: acme.sales.region
description: "Process sales for {{ task.value.region_name }}"
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: "https://api.acme.com/sales/{{ task.value.region_id }}/{{ execution.startDate | date('yyyy-MM-dd') }}"
Pattern 3: Flow Templating
# flow-template.yml
id: report-template
namespace: acme.templates
description: Template for all report flows
variables:
report_name: REQUIRED
source_url: REQUIRED
target_database: "data_warehouse"
schedule_cron: "0 2 * * *"
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: "{{ vars.source_url }}/{{ execution.startDate | date('yyyy-MM-dd') }}"
- id: transform
type: io.kestra.plugin.scripts.python.Script
script: |
# Transform logic for {{ vars.report_name }}
print("Processing {{ vars.report_name }}")
- id: load
type: io.kestra.plugin.jdbc.postgresql.Load
table: "{{ vars.report_name | lower }}_reports"
from: "{{ outputs.transform.outputFiles['output.parquet'] }}"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "{{ vars.schedule_cron }}"
Flow Metadata and Best Practices
# flow-with-metadata.yml
id: production-pipeline
namespace: acme.production
revision: 1
description: |
# Production Data Pipeline
## Purpose
Processes customer transactions for billing.
## Owner
Data Engineering Team (data-team@acme.com)
## SLA
- Must complete within 2 hours
- Runs daily at 2 AM UTC
- Alert on-call if fails
## Data Flow
1. Extract from Payment API
2. Validate transactions
3. Enrich with customer data
4. Load to Data Warehouse
5. Generate reconciliation report
documentation: |
## Technical Details
### Dependencies
- Payment API v2.1
- Customer Service v1.3
- Snowflake DW
### Error Handling
- Retry 3 times on API failures
- Dead letter queue for invalid records
- Slack alert on permanent failure
### Performance
- Processes ~1M records/day
- Average runtime: 45 minutes
- Peak memory: 2GB
labels:
environment: production
tier: 1
team: data-engineering
cost-center: billing
compliance: pci-dss
data-retention: 7-years
# Audit trail
created: 2024-01-15T10:30:00Z
updated: 2024-01-20T14:45:00Z
updatedBy: alice@acme.com
Flow Versioning and Deployment Strategies
# deployment-strategy.yml
id: deploy-flow-version
namespace: acme.deployment
inputs:
- name: flow_id
type: STRING
required: true
- name: namespace
type: STRING
required: true
- name: yaml_content
type: STRING
required: true
- name: deployment_strategy
type: STRING
choices: [blue-green, canary, immediate]
defaults: immediate
tasks:
- id: validate-flow
type: io.kestra.plugin.core.flow.Validate
flow: "{{ inputs.yaml_content }}"
- id: get-current-flow
type: io.kestra.plugin.core.flow.Get
namespace: "{{ inputs.namespace }}"
flowId: "{{ inputs.flow_id }}"
ignoreMissing: true
- id: deploy-immediate
type: io.kestra.plugin.core.flow.Update
namespace: "{{ inputs.namespace }}"
flowId: "{{ inputs.flow_id }}"
flow: "{{ inputs.yaml_content }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ inputs.deployment_strategy == 'immediate' }}"
- id: deploy-blue-green
type: io.kestra.plugin.core.flow.Create
namespace: "{{ inputs.namespace }}"
flow:
id: "{{ inputs.flow_id }}-blue"
namespace: "{{ inputs.namespace }}"
# ... flow content with -blue suffix
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ inputs.deployment_strategy == 'blue-green' }}"
Core Concept 3: Tasks - The Workhorses of Your Workflow
Task Taxonomy: Understanding the Hierarchy
Task Types
├── Core Tasks (Built-in)
│ ├── Control Flow
│ │ ├── Parallel
│ │ ├── Sequential
│ │ ├── Each (Loop)
│ │ └── Conditional
│ ├── Data Movement
│ │ ├── HTTP
│ │ ├── Files
│ │ └── Databases
│ └── Utilities
│ ├── Logging
│ ├── Debugging
│ └── Notifications
│
├── Script Tasks
│ ├── Python
│ ├── Bash
│ ├── Node.js
│ └── R
│
├── Plugin Tasks
│ ├── Cloud (AWS, GCP, Azure)
│ ├── Databases (PostgreSQL, Snowflake, BigQuery)
│ ├── Messaging (Kafka, RabbitMQ)
│ └── Specialized (dbt, Great Expectations)
│
└── Custom Tasks
└── Java/Scala plugins
Task Anatomy: Every Task Explained
tasks:
- id: download_file # Required: Unique identifier
type: io.kestra.plugin.core.http.Download # Required: Task type
description: "Download sales data" # Optional: Human description
uri: "https://api.example.com/data" # Required: Type-specific params
# Execution Control
timeout: PT5M # Max execution time
retry: # Retry configuration
type: exponential
maxAttempt: 3
delay: PT10S
# Error Handling
allowFailure: false # Continue on failure?
fatalError: ["CONNECTION_ERROR"] # Specific errors that stop flow
# Resource Management
taskRunner: # How to run the task
type: io.kestra.plugin.core.runner.Process
memory: 512Mi
cpu: 250m
# Input/Output
inputFiles: # Files available to task
config.json: |
{"key": "value"}
outputFiles: # Files produced by task
- "*.parquet"
# Dependencies
dependsOn: # Wait for these tasks
- validate_config
send: # Data to pass
- key: url
value: "{{ outputs.config_reader.api_url }}"
# Metadata
labels: # Task-specific labels
data-sensitive: true
cost-center: marketing
# Advanced
before: [] # Hooks
after: []
workerGroup: heavy-lifting # Specific worker group
Task Execution States
Understanding these states is crucial for debugging:
Advanced Task Patterns
Pattern 1: Dynamic Task Generation
tasks:
- id: get_file_list
type: io.kestra.plugin.core.http.Download
uri: "https://api.example.com/files"
- id: process_files
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ outputs.get_file_list.body | jq('.files[]') }}"
tasks:
- id: "process_{{ taskloop.index }}"
type: io.kestra.plugin.core.http.Download
uri: "{{ task.value.url }}"
outputFile: "file_{{ taskloop.index }}.csv"
- id: "transform_{{ taskloop.index }}"
type: io.kestra.plugin.scripts.python.Script
inputFiles:
input.csv: "{{ outputs['process_' + taskloop.index].uri }}"
script: |
import pandas as pd
df = pd.read_csv('input.csv')
df.to_parquet('output.parquet')
dependsOn:
- "process_{{ taskloop.index }}"
Pattern 2: Task Templates with Macros
# Define reusable task patterns
variables:
retry_config:
type: exponential
maxAttempt: 3
delay: PT10S
resource_config:
taskRunner:
type: io.kestra.plugin.core.runner.Process
memory: 1Gi
cpu: 500m
tasks:
- id: api_call_template
type: io.kestra.plugin.core.http.Request
uri: "{{ api_endpoint }}"
method: POST
headers:
Authorization: "Bearer {{ api_token }}"
retry: "{{ vars.retry_config }}"
taskRunner: "{{ vars.resource_config }}"
- id: specific_api_call
type: io.kestra.plugin.core.http.Request
uri: "https://api.example.com/users"
# Inherit from template
<<: *api_call_template
# Override specific values
method: GET
Pattern 3: Task Composition
tasks:
- id: composite_task
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: step1
type: io.kestra.plugin.core.log.Log
message: "Step 1"
- id: step2
type: io.kestra.plugin.core.debug.Return
format: "Step 2 output"
- id: step3
type: io.kestra.plugin.core.log.Log
message: "Step 3 with output: {{ outputs.step2.value }}"
Task Communication: The Secret Sauce
Tasks communicate through inputs and outputs. Understanding this is key to building complex workflows:
tasks:
- id: producer
type: io.kestra.plugin.core.debug.Return
format: "Hello from producer"
- id: consumer_direct
type: io.kestra.plugin.core.log.Log
message: "Direct access: {{ outputs.producer.value }}"
- id: consumer_with_send
type: io.kestra.plugin.core.debug.Return
format: "Forwarded: {{ taskrun.value }}"
send:
- producer
- id: transform_data
type: io.kestra.plugin.scripts.python.Script
script: |
import json
data = {{ outputs.producer.value | tojson }}
result = {"transformed": data.upper()}
print(json.dumps(result))
- id: use_transformed
type: io.kestra.plugin.core.log.Log
message: >
Transformed: {{ outputs.transform_data.vars.result | fromjson }}
Task Error Handling Strategies
tasks:
- id: risky_operation
type: io.kestra.plugin.core.http.Download
uri: "https://unreliable-api.com/data"
# Strategy 1: Retry
retry:
type: exponential
maxAttempt: 5
delay: PT30S
maxDelay: PT5M
# Strategy 2: Allow failure with fallback
allowFailure: true
- id: fallback
type: io.kestra.plugin.core.debug.Return
format: "Using cached data"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.risky_operation.state.current == 'FAILED' }}"
- id: continue_with_data
type: io.kestra.plugin.core.flow.Switch
value: "{{ outputs.risky_operation.state.current }}"
cases:
SUCCESS:
- id: use_api_data
type: io.kestra.plugin.core.log.Log
message: "Using API data"
FAILED:
- id: use_fallback_data
type: io.kestra.plugin.core.log.Log
message: "Using fallback data"
# Strategy 3: Dead letter queue
- id: dead_letter_queue
type: io.kestra.plugin.core.flow.Subflow
namespace: system.errors
flowId: handle-failure
wait: false # Don't wait, fire and forget
transmit:
- name: error_details
value: "{{ outputs.risky_operation }}"
- name: flow_execution_id
value: "{{ execution.id }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.risky_operation.state.current == 'FAILED' }}"
Task Performance Optimization
tasks:
- id: optimized_task
type: io.kestra.plugin.jdbc.postgresql.Query
sql: "SELECT * FROM large_table"
# Optimization 1: Resource allocation
taskRunner:
type: io.kestra.plugin.core.runner.Process
memory: 4Gi
cpu: 2
# Optimization 2: Parallelism
maxParallel: 4
# Optimization 3: Caching
cache:
enabled: true
ttl: PT1H
inputs:
- sql
# Optimization 4: Chunking
fetchSize: 10000
chunkSize: 1000
# Optimization 5: Timeouts
timeout: PT30M
connectionTimeout: PT5M
readTimeout: PT10M
Putting It All Together: Real-World Example
Let's build a complete, production-ready pipeline using all concepts:
# production-data-pipeline.yml
id: customer-lifetime-value
namespace: acme.marketing.analytics
revision: 2
description: |
# Customer Lifetime Value Pipeline
Calculates CLV from multiple data sources.
## Business Logic
1. Extract raw data from 3 sources
2. Clean and standardize
3. Calculate metrics
4. Load to data warehouse
5. Generate reports
## Technical Details
- Processes ~10M records daily
- Runtime target: < 2 hours
- Storage: S3 for raw, Snowflake for processed
- Alerting: Slack on failure
labels:
team: marketing-analytics
cost-center: marketing
data-classification: internal
sla: business-hours
environment: production
variables:
retention_days: 90
batch_size: 50000
regions: ["us-east", "eu-west", "ap-southeast"]
inputs:
- name: processing_date
type: DATETIME
defaults: "{{ now() | date('yyyy-MM-dd') }}"
- name: force_reprocess
type: BOOLEAN
defaults: false
tasks:
# ===== NAMESPACE: Organization =====
- id: setup_namespace
type: io.kestra.plugin.core.namespace.UpdateNamespace
namespace: "{{ namespace.id }}"
variables:
last_processed: "{{ inputs.processing_date }}"
pipeline_version: "{{ flow.revision }}"
# ===== FLOW: Control Structure =====
- id: validate_inputs
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ inputs.processing_date | date('yyyy-MM-dd') <= now() | date('yyyy-MM-dd') }}
errorMessage: "Processing date cannot be in the future"
- id: check_previous_run
type: io.kestra.plugin.core.flow.Previous
namespace: "{{ namespace.id }}"
flowId: "{{ flow.id }}"
successOnly: true
- id: should_run
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ inputs.force_reprocess }} or
{{ outputs.check_previous_run.executions | length == 0 }} or
{{ outputs.check_previous_run.executions[0].state.current != 'SUCCESS' }}
# ===== TASKS: Parallel Processing =====
- id: extract_parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: extract_sales
type: io.kestra.plugin.core.http.Download
uri: "https://api.acme.com/sales/{{ inputs.processing_date }}"
description: "Extract sales data"
outputFile: "sales_raw.json"
- id: extract_customers
type: io.kestra.plugin.jdbc.postgresql.Query
sql: >
SELECT * FROM customers
WHERE updated_at >= '{{ inputs.processing_date }}'
description: "Extract customer data"
fetch: true
- id: extract_marketing
type: io.kestra.plugin.core.ssh.Commands
host: marketing-db.acme.com
username: etl_user
privateKey: "{{ secret('MARKETING_SSH_KEY') }}"
commands:
- "mysqldump marketing events --where=\"date='{{ inputs.processing_date }}'\""
description: "Extract marketing events"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.should_run.value }}"
# ===== TASKS: Transformation =====
- id: transform_data
type: io.kestra.plugin.scripts.python.Script
description: "Transform and calculate CLV"
inputFiles:
sales.json: "{{ outputs.extract_parallel.outputs.extract_sales.uri }}"
customers.json: |
{{ outputs.extract_parallel.outputs.extract_customers.rows | tojson }}
script: |
import pandas as pd
import json
from datetime import datetime
# Load data
sales = pd.read_json('sales.json')
customers = pd.DataFrame(json.loads('customers.json'))
# Complex transformation logic
# ... (50 lines of pandas transformations)
# Calculate CLV
clv_df = calculate_clv(sales, customers)
# Save results
clv_df.to_parquet('clv_calculated.parquet')
# Generate summary
summary = {
'total_customers': len(clv_df),
'avg_clv': clv_df['clv'].mean(),
'processing_date': '{{ inputs.processing_date }}',
'generated_at': datetime.now().isoformat()
}
with open('summary.json', 'w') as f:
json.dump(summary, f)
taskRunner:
type: io.kestra.plugin.core.runner.Process
memory: 8Gi
cpu: 4
image: python:3.11-slim
dependsOn:
- extract_parallel
# ===== TASKS: Loading =====
- id: load_to_warehouse
type: io.kestra.plugin.jdbc.snowflake.Load
description: "Load CLV data to Snowflake"
from: "{{ outputs.transform_data.outputFiles['clv_calculated.parquet'] }}"
table: customer_clv
schema: marketing
warehouse: analytics_wh
role: etl_role
stage: @clv_stage
fileFormat: "(TYPE = PARQUET)"
# Optimize loading
copyOptions:
- ON_ERROR = CONTINUE
- PURGE = TRUE
dependsOn:
- transform_data
# ===== TASKS: Quality Checks =====
- id: data_quality_check
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.data.quality
flowId: validate-clv
wait: true
transmit:
- name: data_file
value: "{{ outputs.transform_data.outputFiles['clv_calculated.parquet'] }}"
- name: expected_rows
value: "{{ outputs.transform_data.vars.summary.total_customers }}"
dependsOn:
- transform_data
# ===== TASKS: Reporting =====
- id: generate_reports
type: io.kestra.plugin.core.flow.EachSequential
value: "{{ vars.regions }}"
tasks:
- id: generate_{{ task.value }}_report
type: io.kestra.plugin.scripts.python.Script
description: "Generate regional CLV report"
script: |
# Region-specific reporting logic
generate_region_report('{{ task.value }}')
taskRunner:
type: io.kestra.plugin.core.runner.Process
memory: 2Gi
cpu: 1
dependsOn:
- load_to_warehouse
- data_quality_check
# ===== TASKS: Cleanup =====
- id: cleanup
type: io.kestra.plugin.fs.Delete
description: "Clean up temporary files"
uri: "kestra://{{ execution.id }}/**"
regex: ".*\\.(json|parquet|tmp)$"
action: DELETE
dependsOn:
- generate_reports
# ===== TASKS: Finalization =====
- id: update_metadata
type: io.kestra.plugin.core.namespace.UpdateNamespace
namespace: "{{ namespace.id }}"
variables:
last_successful_run: "{{ execution.startDate }}"
records_processed: "{{ outputs.transform_data.vars.summary.total_customers }}"
- id: send_success_notification
type: io.kestra.plugin.notifications.slack.SlackExecution
channel: "#marketing-alerts"
message: |
✅ CLV Pipeline Success
Date: {{ inputs.processing_date }}
Customers: {{ outputs.transform_data.vars.summary.total_customers }}
Duration: {{ execution.duration }}
Execution: {{ execution.id }}
dependsOn:
- cleanup
# ===== ERROR HANDLING =====
- id: handle_failure
type: io.kestra.plugin.notifications.slack.SlackExecution
channel: "#data-alerts-critical"
message: |
🔴 CLV Pipeline Failed
Error: {{ execution.state.current }}
Date: {{ inputs.processing_date }}
Execution: {{ execution.id }}
Link: {{ serverUrl }}/ui/executions/{{ execution.id }}
conditions:
- type: io.kestra.plugin.core.condition.Execution
states: [FAILED, KILLED, WARNING]
triggers:
- id: daily_schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 2 * * *" # 2 AM daily
timezone: UTC
backfill:
start: 2024-01-01T00:00:00Z
- id: manual_trigger
type: io.kestra.plugin.core.trigger.Webhook
key: clv-pipeline
timeout: PT4H # Fail if takes longer than 4 hours
# Flow-level error handling
listeners:
- conditions:
- type: io.kestra.plugin.core.condition.Execution
states: [FAILED]
tasks:
- id: emergency_cleanup
type: io.kestra.plugin.core.flow.Subflow
namespace: system.emergency
flowId: cleanup-failed-execution
transmit:
- name: execution_id
value: "{{ execution.id }}"
Best Practices Summary
For Namespaces:
Use hierarchical naming:
company.department.function.frequencySet namespace variables for configuration
Use permissions for access control
Document each namespace's purpose
Keep related flows together
For Flows:
One flow = one business process
Use descriptive IDs and descriptions
Version control your flows
Use labels for filtering and management
Set appropriate timeouts
For Tasks:
Each task should do one thing well
Use descriptive task IDs
Set resource limits appropriately
Implement proper error handling
Use subflows for complex logic
Performance Tips:
Use Parallel tasks for independent operations
Cache expensive operations
Set appropriate batch sizes
Monitor task durations
Use appropriate worker groups
Maintainability:
Keep flows under 20 tasks (use subflows)
Document complex logic
Use consistent naming conventions
Test flows before production
Review flow changes
Common Pitfalls and How to Avoid Them
Pitfall 1: The Monolithic Flow
# BAD: 50+ tasks in one flow
# GOOD: Break into subflows
- id: process_data
type: io.kestra.plugin.core.flow.Subflow
namespace: data.processing
flowId: transform-pipeline
Pitfall 2: Hardcoded Values
# BAD:
uri: "https://api.company.com/data"
# GOOD:
uri: "{{ vars.api_base_url }}/data"
# BETTER:
uri: "{{ secret('API_BASE_URL') }}/data"
Pitfall 3: No Error Handling
# BAD: Task without retry
# GOOD: Add retry configuration
retry:
type: exponential
maxAttempt: 3
delay: PT10S
Pitfall 4: Resource Hogging
# BAD: No resource limits
# GOOD: Set appropriate limits
taskRunner:
type: io.kestra.plugin.core.runner.Process
memory: 1Gi
cpu: 500m
Next Steps: Hands-On Exercise
To solidify your understanding, try this exercise:
Create a namespace structure for your organization
Build a modular flow that:
Extracts data from a public API
Processes it in parallel
Handles errors gracefully
Sends notifications
Optimize the flow for performance
Document everything using flow descriptions and labels
Conclusion
You now understand the three pillars of Kestra architecture:
Namespaces for organization and security
Flows as declarative workflow blueprints
Tasks as the building blocks of execution
Remember: Great orchestration comes from thoughtful design, not complex code. Kestra gives you the tools—how you use them determines your success.
Key mindset shifts:
Think in declarative terms, not imperative
Design for observability from day one
Build modular components
Plan for failure as a normal case
Document everything
In the next article, we'll dive into Building Your First ETL Pipeline, where we'll apply these concepts to solve real-world data problems.
Before the next article, try:
Refactor one of your existing workflows using namespace variables
Create a task template for common operations
Set up proper error handling in a critical flow
Document a flow using the complete metadata format
Happy orchestrating! 🚀


