Skip to main content

Command Palette

Search for a command to run...

Article 3: Understanding Kestra's Architecture: Flows, Tasks, and Namespaces

Updated
M

Learner, Love to make things simple, Full Stack Developer, StackOverflower, Passionate about using machine learning, deep learning and AI

Building Blocks of Declarative Orchestration.

Introduction: The Power of Simplicity

Imagine trying to build a house without understanding bricks, beams, and blueprints. That's what using an orchestration tool without understanding its core concepts feels like. Today, we'll transform you from a casual user to an architect who understands every component of Kestra's structure.

By the end of this article, you'll be able to:

  • Design complex workflows with confidence

  • Choose the right task for every job

  • Organize your projects for maximum maintainability

  • Debug issues by understanding the underlying architecture

  • Optimize performance at the structural level


Core Concept 1: Namespaces - Your Organizational Foundation

What Are Namespaces?

Think of namespaces as virtual folders or projects in Kestra. They're not just organizational fluff—they're the backbone of security, access control, and workflow management.

# This isn't just a label, it's an organizational system
namespace: finance.europe.reports

Why Namespaces Matter More Than You Think

Real-world analogy: If Kestra were a company:

  • Namespace = Department (Finance, Marketing, Engineering)

  • Flow = Project within that department

  • Task = Individual task in the project

  • Execution = Project run

Namespace Hierarchy Best Practices

# BAD: Flat structure
namespace: sales_report_2024

# GOOD: Hierarchical structure
namespace: sales
# or better:
namespace: sales.region.europe.reports
# or even:
namespace: sales.region.europe.reports.monthly

Recommended Structure:

company.department.region.function.frequency
└── acme.finance.europe.reports.monthly
└── acme.marketing.us.campaigns.daily
└── acme.engineering.global.ci.cd.triggered

Practical Implementation

# namespace-structure.yml
id: create-namespaces
namespace: system.admin
description: Create organizational namespace structure

tasks:
  - id: create-finance-namespace
    type: io.kestra.plugin.core.namespace.CreateNamespace
    namespace: acme.finance
    description: "Finance department workflows"
    variables:
      department: finance
      sla: business_hours
      owner: finance-team@acme.com

  - id: create-finance-subnamespaces
    type: io.kestra.plugin.core.flow.EachSequential
    value: ["reports", "reconciliation", "compliance", "forecasting"]
    tasks:
      - id: create-subnamespace-{{ task.value }}
        type: io.kestra.plugin.core.namespace.CreateNamespace
        namespace: "acme.finance.{{ task.value }}"
        description: "Finance {{ task.value }} workflows"

  - id: set-permissions
    type: io.kestra.plugin.core.namespace.UpdateNamespace
    namespace: acme.finance.reports
    permissions:
      - username: alice@acme.com
        permission: EXECUTE
      - username: bob@acme.com
        permission: READ
      - group: finance-managers
        permission: OWNER

Namespace Configuration Options

# Advanced namespace configuration
id: configure-namespace
namespace: system.config
description: Configure namespace with advanced settings

tasks:
  - id: create-namespace-with-advanced-config
    type: io.kestra.plugin.core.namespace.CreateNamespace
    namespace: acme.data.sensitive
    description: "Namespace for sensitive data processing"
    variables:
      # Custom metadata
      data_classification: PII
      retention_days: 30
      backup_frequency: daily
      compliance: GDPR

    # Task defaults for this namespace
    taskDefaults:
      - type: io.kestra.plugin.scripts.python.Script
        values:
          taskRunner:
            type: io.kestra.plugin.core.runner.Process
            memory: 1Gi
            cpu: 500m
          timeout: PT30M

    # Flow defaults
    flowDefaults:
      triggers:
        - type: io.kestra.plugin.core.trigger.Schedule
          disabled: false
      logging:
        level: INFO

The Hidden Power: Namespace Variables

# Using namespace variables
id: use-namespace-vars
namespace: acme.finance.reports
description: Demonstrates namespace variables

inputs:
  - name: report_date
    type: DATETIME
    defaults: "{{ now() }}"

tasks:
  - id: log-config
    type: io.kestra.plugin.core.log.Log
    message: |
      Namespace: {{ namespace.id }}
      Variables: {{ namespace.variables | tojson }}
      SLA: {{ namespace.variables.sla }}
      Owner: {{ namespace.variables.owner }}

  - id: process-report
    type: io.kestra.plugin.core.debug.Return
    format: >
      Processing {{ namespace.variables.department }} report
      for {{ inputs.report_date | date('yyyy-MM-dd') }}
      with retention {{ namespace.variables.retention_days }} days

Common Namespace Anti-Patterns

Anti-Pattern 1: Too Many Namespaces

# Don't do this:
namespace: acme.sales.daily.report.january.2024.region.europe
# Result: Unmanageable hierarchy, permission nightmares

Anti-Pattern 2: No Structure

# Don't do this:
namespace: report_for_bob
namespace: alice_special_project
namespace: urgent_thing
# Result: Chaos, no discoverability

Anti-Pattern 3: Using Namespaces as Environments

# Instead of:
namespace: dev.sales.report
namespace: prod.sales.report

# Consider:
namespace: sales.report
# Use flow labels or variables for environment:
labels:
  environment: dev
  region: us-east-1

Core Concept 2: Flows - Your Workflow Blueprint

Anatomy of a Flow

A flow is a declarative blueprint for your workflow. Let's dissect one:

# flow-anatomy.yml
id: data-pipeline                     # Unique identifier within namespace
namespace: acme.data.engineering      # Organizational container
revision: 3                           # Version (auto-managed)
description: "ETL pipeline for customer data"  # Human-readable description
labels:                               # Metadata for filtering/searching
  team: data-engineering
  priority: high
  data-sensitive: true
  cost-center: marketing

tasks:                                # The actual workflow steps
  - id: extract
    # Task configuration...

  - id: transform
    # Task configuration...

  - id: load
    # Task configuration...

triggers:                            # When to run automatically
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 2 * * *"

variables:                           # Flow-level variables
  batch_size: 1000
  target_table: customers_staging

timeout: PT1H                        # Maximum execution time
disabled: false                      # Enable/disable flow

Flow Lifecycle: From YAML to Execution

Understanding this lifecycle is crucial for debugging:

1. Definition (YAML)
   
2. Parsing & Validation
   
3. Storage (PostgreSQL)
   
4. Trigger Detection
   
5. Execution Instantiation
   
6. Task Execution
   
7. State Management
   
8. Completion/Error Handling

Advanced Flow Patterns

Pattern 1: Modular Flows with Subflows

# main-flow.yml
id: main-orchestration
namespace: acme.data

tasks:
  - id: call-subflow
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.data.utilities
    flowId: data-validation
    wait: true
    transmit:
      - name: input_file
        value: "{{ outputs.download.uri }}"
    receive:
      - name: validation_report

  - id: continue-processing
    type: io.kestra.plugin.core.log.Log
    message: "Validation result: {{ outputs.call-subflow.outputs.validation_report }}"
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.call-subflow.state.current == 'SUCCESS' }}"

Pattern 2: Dynamic Flow Generation

# generate-flows.yml
id: flow-generator
namespace: acme.system

tasks:
  - id: get-regions
    type: io.kestra.plugin.jdbc.postgresql.Query
    fetch: true
    sql: "SELECT region_id, region_name FROM sales_regions"

  - id: create-region-flows
    type: io.kestra.plugin.core.flow.EachParallel
    value: "{{ outputs.get-regions.rows }}"
    tasks:
      - id: create-flow-{{ task.value.region_id }}
        type: io.kestra.plugin.core.flow.Create
        namespace: acme.sales.region
        flow:
          id: process_{{ task.value.region_name | lower }}
          namespace: acme.sales.region
          description: "Process sales for {{ task.value.region_name }}"
          tasks:
            - id: extract
              type: io.kestra.plugin.core.http.Download
              uri: "https://api.acme.com/sales/{{ task.value.region_id }}/{{ execution.startDate | date('yyyy-MM-dd') }}"

Pattern 3: Flow Templating

# flow-template.yml
id: report-template
namespace: acme.templates
description: Template for all report flows

variables:
  report_name: REQUIRED
  source_url: REQUIRED
  target_database: "data_warehouse"
  schedule_cron: "0 2 * * *"

tasks:
  - id: extract
    type: io.kestra.plugin.core.http.Download
    uri: "{{ vars.source_url }}/{{ execution.startDate | date('yyyy-MM-dd') }}"

  - id: transform
    type: io.kestra.plugin.scripts.python.Script
    script: |
      # Transform logic for {{ vars.report_name }}
      print("Processing {{ vars.report_name }}")

  - id: load
    type: io.kestra.plugin.jdbc.postgresql.Load
    table: "{{ vars.report_name | lower }}_reports"
    from: "{{ outputs.transform.outputFiles['output.parquet'] }}"

triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "{{ vars.schedule_cron }}"

Flow Metadata and Best Practices

# flow-with-metadata.yml
id: production-pipeline
namespace: acme.production
revision: 1
description: |
  # Production Data Pipeline

  ## Purpose
  Processes customer transactions for billing.

  ## Owner
  Data Engineering Team (data-team@acme.com)

  ## SLA
  - Must complete within 2 hours
  - Runs daily at 2 AM UTC
  - Alert on-call if fails

  ## Data Flow
  1. Extract from Payment API
  2. Validate transactions
  3. Enrich with customer data
  4. Load to Data Warehouse
  5. Generate reconciliation report

documentation: |
  ## Technical Details

  ### Dependencies
  - Payment API v2.1
  - Customer Service v1.3
  - Snowflake DW

  ### Error Handling
  - Retry 3 times on API failures
  - Dead letter queue for invalid records
  - Slack alert on permanent failure

  ### Performance
  - Processes ~1M records/day
  - Average runtime: 45 minutes
  - Peak memory: 2GB

labels:
  environment: production
  tier: 1
  team: data-engineering
  cost-center: billing
  compliance: pci-dss
  data-retention: 7-years

# Audit trail
created: 2024-01-15T10:30:00Z
updated: 2024-01-20T14:45:00Z
updatedBy: alice@acme.com

Flow Versioning and Deployment Strategies

# deployment-strategy.yml
id: deploy-flow-version
namespace: acme.deployment

inputs:
  - name: flow_id
    type: STRING
    required: true
  - name: namespace
    type: STRING
    required: true
  - name: yaml_content
    type: STRING
    required: true
  - name: deployment_strategy
    type: STRING
    choices: [blue-green, canary, immediate]
    defaults: immediate

tasks:
  - id: validate-flow
    type: io.kestra.plugin.core.flow.Validate
    flow: "{{ inputs.yaml_content }}"

  - id: get-current-flow
    type: io.kestra.plugin.core.flow.Get
    namespace: "{{ inputs.namespace }}"
    flowId: "{{ inputs.flow_id }}"
    ignoreMissing: true

  - id: deploy-immediate
    type: io.kestra.plugin.core.flow.Update
    namespace: "{{ inputs.namespace }}"
    flowId: "{{ inputs.flow_id }}"
    flow: "{{ inputs.yaml_content }}"
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ inputs.deployment_strategy == 'immediate' }}"

  - id: deploy-blue-green
    type: io.kestra.plugin.core.flow.Create
    namespace: "{{ inputs.namespace }}"
    flow:
      id: "{{ inputs.flow_id }}-blue"
      namespace: "{{ inputs.namespace }}"
      # ... flow content with -blue suffix
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ inputs.deployment_strategy == 'blue-green' }}"

Core Concept 3: Tasks - The Workhorses of Your Workflow

Task Taxonomy: Understanding the Hierarchy

Task Types
├── Core Tasks (Built-in)
   ├── Control Flow
      ├── Parallel
      ├── Sequential
      ├── Each (Loop)
      └── Conditional
   ├── Data Movement
      ├── HTTP
      ├── Files
      └── Databases
   └── Utilities
       ├── Logging
       ├── Debugging
       └── Notifications

├── Script Tasks
   ├── Python
   ├── Bash
   ├── Node.js
   └── R

├── Plugin Tasks
   ├── Cloud (AWS, GCP, Azure)
   ├── Databases (PostgreSQL, Snowflake, BigQuery)
   ├── Messaging (Kafka, RabbitMQ)
   └── Specialized (dbt, Great Expectations)

└── Custom Tasks
    └── Java/Scala plugins

Task Anatomy: Every Task Explained

tasks:
  - id: download_file                    # Required: Unique identifier
    type: io.kestra.plugin.core.http.Download  # Required: Task type
    description: "Download sales data"   # Optional: Human description
    uri: "https://api.example.com/data"  # Required: Type-specific params

    # Execution Control
    timeout: PT5M                        # Max execution time
    retry:                               # Retry configuration
      type: exponential
      maxAttempt: 3
      delay: PT10S

    # Error Handling
    allowFailure: false                  # Continue on failure?
    fatalError: ["CONNECTION_ERROR"]     # Specific errors that stop flow

    # Resource Management
    taskRunner:                          # How to run the task
      type: io.kestra.plugin.core.runner.Process
      memory: 512Mi
      cpu: 250m

    # Input/Output
    inputFiles:                          # Files available to task
      config.json: |
        {"key": "value"}
    outputFiles:                         # Files produced by task
      - "*.parquet"

    # Dependencies
    dependsOn:                           # Wait for these tasks
      - validate_config
    send:                                # Data to pass
      - key: url
        value: "{{ outputs.config_reader.api_url }}"

    # Metadata
    labels:                              # Task-specific labels
      data-sensitive: true
      cost-center: marketing

    # Advanced
    before: []                           # Hooks
    after: []
    workerGroup: heavy-lifting           # Specific worker group

Task Execution States

Understanding these states is crucial for debugging:

Advanced Task Patterns

Pattern 1: Dynamic Task Generation

tasks:
  - id: get_file_list
    type: io.kestra.plugin.core.http.Download
    uri: "https://api.example.com/files"

  - id: process_files
    type: io.kestra.plugin.core.flow.EachParallel
    value: "{{ outputs.get_file_list.body | jq('.files[]') }}"
    tasks:
      - id: "process_{{ taskloop.index }}"
        type: io.kestra.plugin.core.http.Download
        uri: "{{ task.value.url }}"
        outputFile: "file_{{ taskloop.index }}.csv"

      - id: "transform_{{ taskloop.index }}"
        type: io.kestra.plugin.scripts.python.Script
        inputFiles:
          input.csv: "{{ outputs['process_' + taskloop.index].uri }}"
        script: |
          import pandas as pd
          df = pd.read_csv('input.csv')
          df.to_parquet('output.parquet')
        dependsOn:
          - "process_{{ taskloop.index }}"

Pattern 2: Task Templates with Macros

# Define reusable task patterns
variables:
  retry_config:
    type: exponential
    maxAttempt: 3
    delay: PT10S

  resource_config:
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
      memory: 1Gi
      cpu: 500m

tasks:
  - id: api_call_template
    type: io.kestra.plugin.core.http.Request
    uri: "{{ api_endpoint }}"
    method: POST
    headers:
      Authorization: "Bearer {{ api_token }}"
    retry: "{{ vars.retry_config }}"
    taskRunner: "{{ vars.resource_config }}"

  - id: specific_api_call
    type: io.kestra.plugin.core.http.Request
    uri: "https://api.example.com/users"
    # Inherit from template
    <<: *api_call_template
    # Override specific values
    method: GET

Pattern 3: Task Composition

tasks:
  - id: composite_task
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: step1
        type: io.kestra.plugin.core.log.Log
        message: "Step 1"

      - id: step2
        type: io.kestra.plugin.core.debug.Return
        format: "Step 2 output"

      - id: step3
        type: io.kestra.plugin.core.log.Log
        message: "Step 3 with output: {{ outputs.step2.value }}"

Task Communication: The Secret Sauce

Tasks communicate through inputs and outputs. Understanding this is key to building complex workflows:

tasks:
  - id: producer
    type: io.kestra.plugin.core.debug.Return
    format: "Hello from producer"

  - id: consumer_direct
    type: io.kestra.plugin.core.log.Log
    message: "Direct access: {{ outputs.producer.value }}"

  - id: consumer_with_send
    type: io.kestra.plugin.core.debug.Return
    format: "Forwarded: {{ taskrun.value }}"
    send:
      - producer

  - id: transform_data
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import json
      data = {{ outputs.producer.value | tojson }}
      result = {"transformed": data.upper()}
      print(json.dumps(result))

  - id: use_transformed
    type: io.kestra.plugin.core.log.Log
    message: >
      Transformed: {{ outputs.transform_data.vars.result | fromjson }}

Task Error Handling Strategies

tasks:
  - id: risky_operation
    type: io.kestra.plugin.core.http.Download
    uri: "https://unreliable-api.com/data"

    # Strategy 1: Retry
    retry:
      type: exponential
      maxAttempt: 5
      delay: PT30S
      maxDelay: PT5M

    # Strategy 2: Allow failure with fallback
    allowFailure: true

  - id: fallback
    type: io.kestra.plugin.core.debug.Return
    format: "Using cached data"
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.risky_operation.state.current == 'FAILED' }}"

  - id: continue_with_data
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ outputs.risky_operation.state.current }}"
    cases:
      SUCCESS:
        - id: use_api_data
          type: io.kestra.plugin.core.log.Log
          message: "Using API data"
      FAILED:
        - id: use_fallback_data
          type: io.kestra.plugin.core.log.Log
          message: "Using fallback data"

  # Strategy 3: Dead letter queue
  - id: dead_letter_queue
    type: io.kestra.plugin.core.flow.Subflow
    namespace: system.errors
    flowId: handle-failure
    wait: false  # Don't wait, fire and forget
    transmit:
      - name: error_details
        value: "{{ outputs.risky_operation }}"
      - name: flow_execution_id
        value: "{{ execution.id }}"
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.risky_operation.state.current == 'FAILED' }}"

Task Performance Optimization

tasks:
  - id: optimized_task
    type: io.kestra.plugin.jdbc.postgresql.Query
    sql: "SELECT * FROM large_table"

    # Optimization 1: Resource allocation
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
      memory: 4Gi
      cpu: 2

    # Optimization 2: Parallelism
    maxParallel: 4

    # Optimization 3: Caching
    cache:
      enabled: true
      ttl: PT1H
      inputs:
        - sql

    # Optimization 4: Chunking
    fetchSize: 10000
    chunkSize: 1000

    # Optimization 5: Timeouts
    timeout: PT30M
    connectionTimeout: PT5M
    readTimeout: PT10M

Putting It All Together: Real-World Example

Let's build a complete, production-ready pipeline using all concepts:

# production-data-pipeline.yml
id: customer-lifetime-value
namespace: acme.marketing.analytics
revision: 2
description: |
  # Customer Lifetime Value Pipeline
  Calculates CLV from multiple data sources.

  ## Business Logic
  1. Extract raw data from 3 sources
  2. Clean and standardize
  3. Calculate metrics
  4. Load to data warehouse
  5. Generate reports

  ## Technical Details
  - Processes ~10M records daily
  - Runtime target: < 2 hours
  - Storage: S3 for raw, Snowflake for processed
  - Alerting: Slack on failure

labels:
  team: marketing-analytics
  cost-center: marketing
  data-classification: internal
  sla: business-hours
  environment: production

variables:
  retention_days: 90
  batch_size: 50000
  regions: ["us-east", "eu-west", "ap-southeast"]

inputs:
  - name: processing_date
    type: DATETIME
    defaults: "{{ now() | date('yyyy-MM-dd') }}"
  - name: force_reprocess
    type: BOOLEAN
    defaults: false

tasks:
  # ===== NAMESPACE: Organization =====
  - id: setup_namespace
    type: io.kestra.plugin.core.namespace.UpdateNamespace
    namespace: "{{ namespace.id }}"
    variables:
      last_processed: "{{ inputs.processing_date }}"
      pipeline_version: "{{ flow.revision }}"

  # ===== FLOW: Control Structure =====
  - id: validate_inputs
    type: io.kestra.plugin.core.condition.Expression
    expression: >
      {{ inputs.processing_date | date('yyyy-MM-dd') <= now() | date('yyyy-MM-dd') }}
    errorMessage: "Processing date cannot be in the future"

  - id: check_previous_run
    type: io.kestra.plugin.core.flow.Previous
    namespace: "{{ namespace.id }}"
    flowId: "{{ flow.id }}"
    successOnly: true

  - id: should_run
    type: io.kestra.plugin.core.condition.Expression
    expression: >
      {{ inputs.force_reprocess }} or
      {{ outputs.check_previous_run.executions | length == 0 }} or
      {{ outputs.check_previous_run.executions[0].state.current != 'SUCCESS' }}

  # ===== TASKS: Parallel Processing =====
  - id: extract_parallel
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: extract_sales
        type: io.kestra.plugin.core.http.Download
        uri: "https://api.acme.com/sales/{{ inputs.processing_date }}"
        description: "Extract sales data"
        outputFile: "sales_raw.json"

      - id: extract_customers
        type: io.kestra.plugin.jdbc.postgresql.Query
        sql: >
          SELECT * FROM customers 
          WHERE updated_at >= '{{ inputs.processing_date }}'
        description: "Extract customer data"
        fetch: true

      - id: extract_marketing
        type: io.kestra.plugin.core.ssh.Commands
        host: marketing-db.acme.com
        username: etl_user
        privateKey: "{{ secret('MARKETING_SSH_KEY') }}"
        commands:
          - "mysqldump marketing events --where=\"date='{{ inputs.processing_date }}'\""
        description: "Extract marketing events"

    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.should_run.value }}"

  # ===== TASKS: Transformation =====
  - id: transform_data
    type: io.kestra.plugin.scripts.python.Script
    description: "Transform and calculate CLV"
    inputFiles:
      sales.json: "{{ outputs.extract_parallel.outputs.extract_sales.uri }}"
      customers.json: |
        {{ outputs.extract_parallel.outputs.extract_customers.rows | tojson }}
    script: |
      import pandas as pd
      import json
      from datetime import datetime

      # Load data
      sales = pd.read_json('sales.json')
      customers = pd.DataFrame(json.loads('customers.json'))

      # Complex transformation logic
      # ... (50 lines of pandas transformations)

      # Calculate CLV
      clv_df = calculate_clv(sales, customers)

      # Save results
      clv_df.to_parquet('clv_calculated.parquet')

      # Generate summary
      summary = {
          'total_customers': len(clv_df),
          'avg_clv': clv_df['clv'].mean(),
          'processing_date': '{{ inputs.processing_date }}',
          'generated_at': datetime.now().isoformat()
      }

      with open('summary.json', 'w') as f:
          json.dump(summary, f)

    taskRunner:
      type: io.kestra.plugin.core.runner.Process
      memory: 8Gi
      cpu: 4
      image: python:3.11-slim

    dependsOn:
      - extract_parallel

  # ===== TASKS: Loading =====
  - id: load_to_warehouse
    type: io.kestra.plugin.jdbc.snowflake.Load
    description: "Load CLV data to Snowflake"
    from: "{{ outputs.transform_data.outputFiles['clv_calculated.parquet'] }}"
    table: customer_clv
    schema: marketing
    warehouse: analytics_wh
    role: etl_role
    stage: @clv_stage
    fileFormat: "(TYPE = PARQUET)"

    # Optimize loading
    copyOptions:
      - ON_ERROR = CONTINUE
      - PURGE = TRUE

    dependsOn:
      - transform_data

  # ===== TASKS: Quality Checks =====
  - id: data_quality_check
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.data.quality
    flowId: validate-clv
    wait: true
    transmit:
      - name: data_file
        value: "{{ outputs.transform_data.outputFiles['clv_calculated.parquet'] }}"
      - name: expected_rows
        value: "{{ outputs.transform_data.vars.summary.total_customers }}"

    dependsOn:
      - transform_data

  # ===== TASKS: Reporting =====
  - id: generate_reports
    type: io.kestra.plugin.core.flow.EachSequential
    value: "{{ vars.regions }}"
    tasks:
      - id: generate_{{ task.value }}_report
        type: io.kestra.plugin.scripts.python.Script
        description: "Generate regional CLV report"
        script: |
          # Region-specific reporting logic
          generate_region_report('{{ task.value }}')
        taskRunner:
          type: io.kestra.plugin.core.runner.Process
          memory: 2Gi
          cpu: 1

    dependsOn:
      - load_to_warehouse
      - data_quality_check

  # ===== TASKS: Cleanup =====
  - id: cleanup
    type: io.kestra.plugin.fs.Delete
    description: "Clean up temporary files"
    uri: "kestra://{{ execution.id }}/**"
    regex: ".*\\.(json|parquet|tmp)$"
    action: DELETE

    dependsOn:
      - generate_reports

  # ===== TASKS: Finalization =====
  - id: update_metadata
    type: io.kestra.plugin.core.namespace.UpdateNamespace
    namespace: "{{ namespace.id }}"
    variables:
      last_successful_run: "{{ execution.startDate }}"
      records_processed: "{{ outputs.transform_data.vars.summary.total_customers }}"

  - id: send_success_notification
    type: io.kestra.plugin.notifications.slack.SlackExecution
    channel: "#marketing-alerts"
    message: |
      ✅ CLV Pipeline Success
      Date: {{ inputs.processing_date }}
      Customers: {{ outputs.transform_data.vars.summary.total_customers }}
      Duration: {{ execution.duration }}
      Execution: {{ execution.id }}

    dependsOn:
      - cleanup

  # ===== ERROR HANDLING =====
  - id: handle_failure
    type: io.kestra.plugin.notifications.slack.SlackExecution
    channel: "#data-alerts-critical"
    message: |
      🔴 CLV Pipeline Failed
      Error: {{ execution.state.current }}
      Date: {{ inputs.processing_date }}
      Execution: {{ execution.id }}
      Link: {{ serverUrl }}/ui/executions/{{ execution.id }}

    conditions:
      - type: io.kestra.plugin.core.condition.Execution
        states: [FAILED, KILLED, WARNING]

triggers:
  - id: daily_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 2 * * *"  # 2 AM daily
    timezone: UTC
    backfill:
      start: 2024-01-01T00:00:00Z

  - id: manual_trigger
    type: io.kestra.plugin.core.trigger.Webhook
    key: clv-pipeline

timeout: PT4H  # Fail if takes longer than 4 hours

# Flow-level error handling
listeners:
  - conditions:
      - type: io.kestra.plugin.core.condition.Execution
        states: [FAILED]
    tasks:
      - id: emergency_cleanup
        type: io.kestra.plugin.core.flow.Subflow
        namespace: system.emergency
        flowId: cleanup-failed-execution
        transmit:
          - name: execution_id
            value: "{{ execution.id }}"

Best Practices Summary

For Namespaces:

  1. Use hierarchical naming: company.department.function.frequency

  2. Set namespace variables for configuration

  3. Use permissions for access control

  4. Document each namespace's purpose

  5. Keep related flows together

For Flows:

  1. One flow = one business process

  2. Use descriptive IDs and descriptions

  3. Version control your flows

  4. Use labels for filtering and management

  5. Set appropriate timeouts

For Tasks:

  1. Each task should do one thing well

  2. Use descriptive task IDs

  3. Set resource limits appropriately

  4. Implement proper error handling

  5. Use subflows for complex logic

Performance Tips:

  1. Use Parallel tasks for independent operations

  2. Cache expensive operations

  3. Set appropriate batch sizes

  4. Monitor task durations

  5. Use appropriate worker groups

Maintainability:

  1. Keep flows under 20 tasks (use subflows)

  2. Document complex logic

  3. Use consistent naming conventions

  4. Test flows before production

  5. Review flow changes


Common Pitfalls and How to Avoid Them

Pitfall 1: The Monolithic Flow

# BAD: 50+ tasks in one flow
# GOOD: Break into subflows
- id: process_data
  type: io.kestra.plugin.core.flow.Subflow
  namespace: data.processing
  flowId: transform-pipeline

Pitfall 2: Hardcoded Values

# BAD:
uri: "https://api.company.com/data"
# GOOD:
uri: "{{ vars.api_base_url }}/data"
# BETTER:
uri: "{{ secret('API_BASE_URL') }}/data"

Pitfall 3: No Error Handling

# BAD: Task without retry
# GOOD: Add retry configuration
retry:
  type: exponential
  maxAttempt: 3
  delay: PT10S

Pitfall 4: Resource Hogging

# BAD: No resource limits
# GOOD: Set appropriate limits
taskRunner:
  type: io.kestra.plugin.core.runner.Process
  memory: 1Gi
  cpu: 500m

Next Steps: Hands-On Exercise

To solidify your understanding, try this exercise:

  1. Create a namespace structure for your organization

  2. Build a modular flow that:

    • Extracts data from a public API

    • Processes it in parallel

    • Handles errors gracefully

    • Sends notifications

  3. Optimize the flow for performance

  4. Document everything using flow descriptions and labels


Conclusion

You now understand the three pillars of Kestra architecture:

  1. Namespaces for organization and security

  2. Flows as declarative workflow blueprints

  3. Tasks as the building blocks of execution

Remember: Great orchestration comes from thoughtful design, not complex code. Kestra gives you the tools—how you use them determines your success.

Key mindset shifts:

  • Think in declarative terms, not imperative

  • Design for observability from day one

  • Build modular components

  • Plan for failure as a normal case

  • Document everything

In the next article, we'll dive into Building Your First ETL Pipeline, where we'll apply these concepts to solve real-world data problems.

Before the next article, try:

  1. Refactor one of your existing workflows using namespace variables

  2. Create a task template for common operations

  3. Set up proper error handling in a critical flow

  4. Document a flow using the complete metadata format

Happy orchestrating! 🚀