Skip to main content

Command Palette

Search for a command to run...

Article 5: Advanced Workflow Patterns in Kestra.

Updated
M

Learner, Love to make things simple, Full Stack Developer, StackOverflower, Passionate about using machine learning, deep learning and AI

Mastering Complex Orchestration Scenarios.

Introduction: The Orchestrator's Toolkit

Imagine you're conducting a symphony. You don't just wave your baton - you cue sections, adjust tempo, handle surprises, and ensure harmony. That's what advanced workflow patterns give you in Kestra: the ability to conduct complex data orchestrations with precision and grace.

In this article, we'll move beyond simple linear pipelines into sophisticated orchestration patterns that handle real-world complexity. You'll learn patterns used by top data teams to build resilient, scalable, and intelligent workflows.

What makes a workflow "advanced"?

  • Intelligence: Making decisions based on data

  • Resilience: Graceful handling of the unexpected

  • Efficiency: Optimal resource usage and parallelism

  • Adaptability: Dynamic behavior based on context

  • Maintainability: Clean, reusable patterns


Pattern 1: Dynamic Parallel Processing

Beyond Simple Parallel Tasks

While Kestra's Parallel task is great for fixed concurrency, real-world scenarios often require dynamic parallelism based on data volume or external factors.

id: dynamic-parallel-processing
namespace: acme.advanced
description: Dynamic parallelism based on data volume

tasks:
  - id: discover_files
    type: io.kestra.plugin.core.http.Request
    uri: "https://api.acme.com/files"
    store: true

  - id: calculate_optimal_parallelism
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      files.json: "{{ outputs.discover_files.uri }}"
    script: |
      import json
      import math

      with open('files.json', 'r') as f:
          files = json.load(f)

      total_files = len(files['files'])
      avg_file_size = sum(f['size'] for f in files['files']) / total_files if total_files > 0 else 0

      # Dynamic parallelism calculation
      # Rule: 1 worker per 100MB, max 10 workers
      max_workers = min(10, max(1, math.ceil((total_files * avg_file_size) / (100 * 1024 * 1024))))

      # Split files into batches
      batch_size = math.ceil(total_files / max_workers)
      batches = []
      for i in range(0, total_files, batch_size):
          batches.append(files['files'][i:i + batch_size])

      print(json.dumps({
          'max_workers': max_workers,
          'batch_size': batch_size,
          'batches': batches,
          'total_files': total_files
      }))

  - id: process_in_dynamic_parallel
    type: io.kestra.plugin.core.flow.EachParallel
    value: "{{ outputs.calculate_optimal_parallelism.vars.batches }}"
    maxParallel: "{{ outputs.calculate_optimal_parallelism.vars.max_workers }}"
    tasks:
      - id: "process_batch_{{ taskloop.index }}"
        type: io.kestra.plugin.scripts.python.Script
        description: "Processing batch {{ taskloop.index }} with {{ task.value | length }} files"
        inputFiles:
          batch_files.json: |
            {{ task.value | tojson }}
        script: |
          import json
          import time

          with open('batch_files.json', 'r') as f:
              files = json.load(f)

          # Simulate processing
          results = []
          for file in files:
              time.sleep(0.1)  # Simulate work
              results.append({
                  'file': file['name'],
                  'processed': True,
                  'size': file['size']
              })

          print(f"Processed {len(results)} files")
          print(json.dumps({'results': results}))

Fan-Out/Fan-In Pattern

This pattern distributes work across many workers (fan-out) and aggregates results (fan-in).

id: fan-out-fan-in-pattern
namespace: acme.advanced
description: Distribute work, process in parallel, aggregate results

tasks:
  # Fan-Out: Distribute work
  - id: generate_work_items
    type: io.kestra.plugin.core.debug.Return
    format: >
      {{ range(1, 101) | map('toString') | list | tojson }}

  # Parallel Processing
  - id: parallel_processing
    type: io.kestra.plugin.core.flow.EachParallel
    value: "{{ outputs.generate_work_items.value }}"
    maxParallel: 20
    tasks:
      - id: "process_item_{{ task.value }}"
        type: io.kestra.plugin.scripts.python.Script
        script: |
          import time
          import random

          # Simulate variable processing time
          processing_time = random.uniform(0.1, 2.0)
          time.sleep(processing_time)

          # Generate result
          result = {
              'item': {{ task.value }},
              'processed_at': time.time(),
              'processing_time': processing_time,
              'result': {{ task.value }} * 2  # Simple transformation
          }

          print(f"Processed item {{ task.value }} in {processing_time:.2f}s")

          # Return result
          import json
          print(json.dumps(result))

  # Fan-In: Aggregate results
  - id: aggregate_results
    type: io.kestra.plugin.scripts.python.Script
    description: "Aggregate all parallel results"
    script: |
      import json
      import statistics

      # Collect all results from parallel tasks
      all_results = []
      processing_times = []

      {% for i in range(1, 101) %}
      try:
          result_json = '''{{ outputs.parallel_processing.outputs["process_item_" + i|string].vars | default('{}') | tojson }}'''
          if result_json:
              result = json.loads(result_json)
              if 'result' in str(result):  # Check if result exists
                  # Extract the actual result from the output
                  # This depends on how your script outputs data
                  all_results.append(result)
                  if 'processing_time' in result:
                      processing_times.append(result['processing_time'])
      except:
          pass
      {% endfor %}

      # Calculate statistics
      stats = {
          'total_processed': len(all_results),
          'average_processing_time': statistics.mean(processing_times) if processing_times else 0,
          'total_processing_time': sum(processing_times),
          'throughput': len(all_results) / (sum(processing_times) if processing_times else 1),
          'results_summary': {
              'min': min(r['result'] for r in all_results) if all_results else 0,
              'max': max(r['result'] for r in all_results) if all_results else 0,
              'avg': statistics.mean(r['result'] for r in all_results) if all_results else 0
          }
      }

      print(json.dumps(stats, indent=2))

    dependsOn:
      - parallel_processing

Pattern 2: Circuit Breaker Pattern

Preventing Cascading Failures

The circuit breaker pattern prevents systems from being overwhelmed by repeated failures, similar to an electrical circuit breaker.

id: circuit-breaker-pattern
namespace: acme.advanced
description: Implement circuit breaker for unreliable external services

variables:
  circuit_breaker_state_key: "circuit_breaker:api.acme.com"
  failure_threshold: 5
  reset_timeout: PT5M

tasks:
  - id: check_circuit_breaker
    type: io.kestra.plugin.core.cache.Get
    key: "{{ vars.circuit_breaker_state_key }}"

  - id: evaluate_circuit_state
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ outputs.check_circuit_breaker.value.state | default('closed') }}"
    cases:
      open:
        - id: circuit_open_check_timeout
          type: io.kestra.plugin.core.condition.Expression
          expression: >
            {{ now().timestamp() - outputs.check_circuit_breaker.value.timestamp > 300 }}

        - id: attempt_reset
          type: io.kestra.plugin.core.log.Log
          message: "Circuit breaker open for {{ vars.reset_timeout }}. Testing if service recovered..."
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.circuit_open_check_timeout.value }}"

        - id: skip_service_call
          type: io.kestra.plugin.core.log.Log
          message: "Circuit breaker is OPEN - skipping service call"
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ not outputs.circuit_open_check_timeout.value }}"

  - id: call_external_service
    type: io.kestra.plugin.core.http.Request
    uri: "https://api.acme.com/unstable-endpoint"
    method: GET
    retry:
      type: exponential
      maxAttempt: 2
      delay: PT5S
    timeout: PT30S
    allowFailure: true
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: >
          {{ outputs.check_circuit_breaker.value.state != 'open' or 
             outputs.circuit_open_check_timeout.value == true }}

  - id: update_circuit_state
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ outputs.call_external_service.state.current }}"
    cases:
      SUCCESS:
        - id: reset_circuit_on_success
          type: io.kestra.plugin.core.cache.Put
          key: "{{ vars.circuit_breaker_state_key }}"
          value: |
            {
              "state": "closed",
              "failure_count": 0,
              "last_success": "{{ now() }}",
              "timestamp": {{ now().timestamp() }}
            }
          ttl: PT1H

      FAILED:
        - id: get_current_failure_count
          type: io.kestra.plugin.core.cache.Get
          key: "{{ vars.circuit_breaker_state_key }}_failures"
          defaultValue: "0"

        - id: increment_failure_count
          type: io.kestra.plugin.core.cache.Put
          key: "{{ vars.circuit_breaker_state_key }}_failures"
          value: "{{ outputs.get_current_failure_count.value | int + 1 }}"
          ttl: PT1H

        - id: check_if_should_trip
          type: io.kestra.plugin.core.condition.Expression
          expression: >
            {{ (outputs.get_current_failure_count.value | int + 1) >= vars.failure_threshold }}

        - id: trip_circuit_breaker
          type: io.kestra.plugin.core.cache.Put
          key: "{{ vars.circuit_breaker_state_key }}"
          value: |
            {
              "state": "open",
              "tripped_at": "{{ now() }}",
              "failure_count": {{ outputs.get_current_failure_count.value | int + 1 }},
              "timestamp": {{ now().timestamp() }}
            }
          ttl: "{{ vars.reset_timeout }}"
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.check_if_should_trip.value }}"

        - id: log_failure
          type: io.kestra.plugin.core.log.Log
          message: >
            Service call failed. Failure count: {{ outputs.get_current_failure_count.value | int + 1 }}/{{ vars.failure_threshold }}
            {% if outputs.check_if_should_trip.value %}
            Circuit breaker TRIPPED to OPEN state
            {% endif %}

  - id: fallback_operation
    type: io.kestra.plugin.core.debug.Return
    format: "Using cached data or alternative service"
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: >
          {{ outputs.call_external_service.state.current == 'FAILED' or
             (outputs.check_circuit_breaker.value.state == 'open' and 
              not outputs.circuit_open_check_timeout.value) }}

Advanced Circuit Breaker with Monitoring

id: monitored-circuit-breaker
namespace: acme.advanced

tasks:
  - id: circuit_breaker_monitor
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import redis
      import json
      from datetime import datetime

      # Connect to shared state store
      r = redis.Redis(host='redis', port=6379, decode_responses=True)

      # Get all circuit breaker states
      circuit_keys = r.keys("circuit_breaker:*")
      circuits = []

      for key in circuit_keys:
          state = r.get(key)
          if state:
              circuits.append({
                  'service': key.replace('circuit_breaker:', ''),
                  'state': json.loads(state),
                  'age_seconds': datetime.now().timestamp() - json.loads(state).get('timestamp', 0)
              })

      # Generate health report
      report = {
          'timestamp': datetime.now().isoformat(),
          'total_circuits': len(circuits),
          'open_circuits': [c for c in circuits if c['state'].get('state') == 'open'],
          'half_open_circuits': [c for c in circuits if c['state'].get('state') == 'half_open'],
          'closed_circuits': [c for c in circuits if c['state'].get('state') == 'closed'],
          'health_score': len([c for c in circuits if c['state'].get('state') == 'closed']) / len(circuits) if circuits else 1.0
      }

      # Alert if health score drops below threshold
      if report['health_score'] < 0.8:
          print("ALERT: Circuit breaker health score below 80%")

      print(json.dumps(report, indent=2))

  - id: send_circuit_health_alert
    type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
    url: "{{ secret('SLACK_WEBHOOK_URL') }}"
    payload: |
      {
        "blocks": [
          {
            "type": "header",
            "text": {
              "type": "plain_text",
              "text": "âš¡ Circuit Breaker Health Report"
            }
          },
          {
            "type": "section",
            "text": {
              "type": "mrkdwn",
              "text": "*Health Score:* {{ outputs.circuit_breaker_monitor.vars.health_score | round(2) * 100 }}%\n*Open Circuits:* {{ outputs.circuit_breaker_monitor.vars.open_circuits | length }}\n*Total Circuits:* {{ outputs.circuit_breaker_monitor.vars.total_circuits }}"
            }
          }
        ]
      }
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.circuit_breaker_monitor.vars.health_score < 0.8 }}"

Pattern 3: Saga Pattern for Distributed Transactions

Implementing Compensating Transactions

The Saga pattern manages distributed transactions by breaking them into a sequence of local transactions, each with a compensating action.

id: saga-pattern-order-processing
namespace: acme.ecommerce
description: Distributed transaction with compensation logic

inputs:
  - name: order_data
    type: JSON
    required: true

tasks:
  # SAGA: Main transaction sequence
  - id: saga_orchestrator
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      # Step 1: Reserve inventory (with compensation)
      - id: reserve_inventory
        type: io.kestra.plugin.core.http.Request
        uri: "https://inventory.acme.com/api/reserve"
        method: POST
        body: |
          {
            "items": {{ inputs.order_data.items | tojson }},
            "order_id": "{{ execution.id }}"
          }
        store: true
        allowFailure: false

      # Step 2: Process payment (with compensation)
      - id: process_payment
        type: io.kestra.plugin.core.http.Request
        uri: "https://payments.acme.com/api/charge"
        method: POST
        body: |
          {
            "order_id": "{{ execution.id }}",
            "amount": {{ inputs.order_data.total }},
            "payment_method": {{ inputs.order_data.payment_method | tojson }}
          }
        store: true
        allowFailure: false

      # Step 3: Schedule shipping (with compensation)
      - id: schedule_shipping
        type: io.kestra.plugin.core.http.Request
        uri: "https://shipping.acme.com/api/schedule"
        method: POST
        body: |
          {
            "order_id": "{{ execution.id }}",
            "address": {{ inputs.order_data.shipping_address | tojson }},
            "items": {{ inputs.order_data.items | tojson }}
          }
        store: true
        allowFailure: false

      # Step 4: Send confirmation (no compensation needed - idempotent)
      - id: send_confirmation
        type: io.kestra.plugin.core.http.Request
        uri: "https://notifications.acme.com/api/send"
        method: POST
        body: |
          {
            "order_id": "{{ execution.id }}",
            "customer_email": "{{ inputs.order_data.customer_email }}",
            "template": "order_confirmation"
          }
        store: true
        allowFailure: true  # Non-critical step

  # SAGA: Compensation handlers (run on failure)
  - id: compensation_orchestrator
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      # Compensate in reverse order
      - id: cancel_shipping_if_scheduled
        type: io.kestra.plugin.core.http.Request
        uri: "https://shipping.acme.com/api/cancel/{{ execution.id }}"
        method: DELETE
        conditions:
          - type: io.kestra.plugin.core.condition.Expression
            expression: >
              {{ outputs.saga_orchestrator.outputs.schedule_shipping != null and
                 outputs.saga_orchestrator.outputs.schedule_shipping.state.current == 'SUCCESS' }}

      - id: refund_payment_if_charged
        type: io.kestra.plugin.core.http.Request
        uri: "https://payments.acme.com/api/refund/{{ execution.id }}"
        method: POST
        conditions:
          - type: io.kestra.plugin.core.condition.Expression
            expression: >
              {{ outputs.saga_orchestrator.outputs.process_payment != null and
                 outputs.saga_orchestrator.outputs.process_payment.state.current == 'SUCCESS' }}

      - id: release_inventory_if_reserved
        type: io.kestra.plugin.core.http.Request
        uri: "https://inventory.acme.com/api/release/{{ execution.id }}"
        method: DELETE
        conditions:
          - type: io.kestra.plugin.core.condition.Expression
            expression: >
              {{ outputs.saga_orchestrator.outputs.reserve_inventory != null and
                 outputs.saga_orchestrator.outputs.reserve_inventory.state.current == 'SUCCESS' }}

    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.saga_orchestrator.state.current == 'FAILED' }}"

  # SAGA: State persistence
  - id: persist_saga_state
    type: io.kestra.plugin.jdbc.postgresql.Query
    sql: |
      INSERT INTO saga_states 
      (saga_id, execution_id, step, status, data, created_at, compensated)
      VALUES (
        'order_processing',
        '{{ execution.id }}',
        '{{ taskrun.id }}',
        '{{ taskrun.state.current }}',
        '{{ taskrun.value | tojson }}',
        NOW(),
        {{ outputs.compensation_orchestrator != null }}
      )
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "true"  # Always run after each task
    send:
      - saga_orchestrator
      - compensation_orchestrator

Choreographed Saga Pattern

id: choreographed-saga
namespace: acme.advanced
description: Event-driven saga pattern

tasks:
  - id: start_saga
    type: io.kestra.plugin.core.debug.Return
    format: "SAGA_STARTED:{{ execution.id }}"

  - id: publish_order_created
    type: io.kestra.plugin.kafka.Produce
    topic: "order-events"
    key: "{{ execution.id }}"
    value: |
      {
        "event_type": "ORDER_CREATED",
        "order_id": "{{ execution.id }}",
        "timestamp": "{{ now() }}",
        "data": {{ inputs.order_data | tojson }}
      }
    properties:
      bootstrap.servers: "kafka:9092"

  # Each service listens to events and publishes its own
  # This is more decoupled but harder to monitor

  - id: monitor_saga_completion
    type: io.kestra.plugin.kafka.Consume
    topic: "saga-completion-events"
    groupId: "saga-monitor-{{ execution.id }}"
    maxRecords: 1
    timeout: PT5M
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.publish_order_created.state.current == 'SUCCESS' }}"

Pattern 4: Event Sourcing with Kestra

Building Event-Sourced Systems

Event sourcing stores state changes as a sequence of events, providing audit trails and time-travel capabilities.

id: event-sourced-shopping-cart
namespace: acme.ecommerce
description: Event-sourced shopping cart implementation

inputs:
  - name: cart_id
    type: STRING
    required: false
    defaults: "cart_{{ execution.id }}"
  - name: action
    type: STRING
    required: true
    choices: ["add_item", "remove_item", "update_quantity", "clear_cart", "checkout"]
  - name: item_data
    type: JSON
    required: false

tasks:
  # Validate action based on current state
  - id: load_current_state
    type: io.kestra.plugin.jdbc.postgresql.Query
    fetch: true
    sql: |
      WITH current_state AS (
        SELECT 
          cart_id,
          SUM(CASE WHEN event_type = 'ITEM_ADDED' THEN 1 ELSE 0 END) as items_added,
          SUM(CASE WHEN event_type = 'ITEM_REMOVED' THEN 1 ELSE 0 END) as items_removed,
          MAX(occurred_at) as last_event_at
        FROM cart_events
        WHERE cart_id = '{{ inputs.cart_id }}'
        GROUP BY cart_id
      ),
      last_checkout AS (
        SELECT MAX(occurred_at) as checkout_time
        FROM cart_events
        WHERE cart_id = '{{ inputs.cart_id }}'
          AND event_type = 'CHECKOUT_COMPLETED'
      )
      SELECT 
        cs.cart_id,
        cs.items_added - cs.items_removed as current_item_count,
        lc.checkout_time
      FROM current_state cs
      LEFT JOIN last_checkout lc ON 1=1
    ignoreMissing: true

  - id: validate_action
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ inputs.action }}"
    cases:
      checkout:
        - id: check_cart_not_empty
          type: io.kestra.plugin.core.condition.Expression
          expression: >
            {{ outputs.load_current_state.rows[0].current_item_count | default(0) > 0 }}
          errorMessage: "Cannot checkout empty cart"

      add_item:
        - id: validate_item_data
          type: io.kestra.plugin.core.condition.Expression
          expression: >
            {{ inputs.item_data != null and 
               inputs.item_data.item_id and 
               inputs.item_data.quantity > 0 }}
          errorMessage: "Invalid item data"

  # Store the event
  - id: store_event
    type: io.kestra.plugin.jdbc.postgresql.Query
    sql: |
      INSERT INTO cart_events
      (event_id, cart_id, event_type, event_data, occurred_at, version)
      VALUES (
        '{{ execution.id }}',
        '{{ inputs.cart_id }}',
        '{{ inputs.action | upper }}',
        '{{ inputs.item_data | default({}) | tojson }}',
        NOW(),
        COALESCE(
          (SELECT MAX(version) + 1 
           FROM cart_events 
           WHERE cart_id = '{{ inputs.cart_id }}'),
          1
        )
      )

  # Project current state from events
  - id: project_current_state
    type: io.kestra.plugin.scripts.python.Script
    description: "Project current cart state from event stream"
    sql: |
      SELECT * FROM cart_events 
      WHERE cart_id = '{{ inputs.cart_id }}'
      ORDER BY version
    script: |
      import json

      # Replay events to build current state
      cart_state = {
          'cart_id': '{{ inputs.cart_id }}',
          'items': {},
          'total_quantity': 0,
          'total_price': 0.0,
          'event_count': 0
      }

      # In a real implementation, you would fetch events from DB
      # For this example, we'll simulate event replay

      # Process based on action
      action = '{{ inputs.action }}'
      item_data = {{ inputs.item_data | default('{}') | tojson }}

      if action == 'add_item':
          item_id = item_data.get('item_id')
          if item_id:
              if item_id not in cart_state['items']:
                  cart_state['items'][item_id] = {
                      'quantity': 0,
                      'price': item_data.get('price', 0)
                  }
              cart_state['items'][item_id]['quantity'] += item_data.get('quantity', 1)
              cart_state['total_quantity'] += item_data.get('quantity', 1)
              cart_state['total_price'] += item_data.get('price', 0) * item_data.get('quantity', 1)

      elif action == 'remove_item':
          item_id = item_data.get('item_id')
          if item_id in cart_state['items']:
              quantity_to_remove = min(
                  item_data.get('quantity', 1),
                  cart_state['items'][item_id]['quantity']
              )
              cart_state['items'][item_id]['quantity'] -= quantity_to_remove
              cart_state['total_quantity'] -= quantity_to_remove
              cart_state['total_price'] -= cart_state['items'][item_id]['price'] * quantity_to_remove

              if cart_state['items'][item_id]['quantity'] <= 0:
                  del cart_state['items'][item_id]

      cart_state['event_count'] += 1

      print(json.dumps(cart_state, indent=2))

  # Publish event to event bus
  - id: publish_to_event_bus
    type: io.kestra.plugin.kafka.Produce
    topic: "cart-events"
    key: "{{ inputs.cart_id }}"
    value: |
      {
        "cart_id": "{{ inputs.cart_id }}",
        "event_type": "{{ inputs.action | upper }}",
        "event_data": {{ inputs.item_data | default({}) | tojson }},
        "timestamp": "{{ now() }}",
        "event_id": "{{ execution.id }}"
      }
    properties:
      bootstrap.servers: "kafka:9092"

Time-Travel Query Pattern

id: time-travel-query
namespace: acme.advanced
description: Query historical state at a specific point in time

inputs:
  - name: entity_id
    type: STRING
    required: true
  - name: as_of_time
    type: DATETIME
    required: true

tasks:
  - id: replay_events_to_timestamp
    type: io.kestra.plugin.jdbc.postgresql.Query
    fetch: true
    sql: |
      SELECT 
        event_type,
        event_data,
        occurred_at,
        version
      FROM entity_events
      WHERE entity_id = '{{ inputs.entity_id }}'
        AND occurred_at <= '{{ inputs.as_of_time }}'
      ORDER BY version

  - id: reconstruct_historical_state
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      events.json: |
        {{ outputs.replay_events_to_timestamp.rows | tojson }}
    script: |
      import json

      with open('events.json', 'r') as f:
          events = json.load(f)

      # Start with empty state
      state = {}

      # Replay events in order
      for event in events:
          event_type = event['event_type']
          event_data = event['event_data']

          if event_type == 'ENTITY_CREATED':
              state = event_data
          elif event_type == 'ENTITY_UPDATED':
              state.update(event_data)
          elif event_type == 'FIELD_SET':
              field_name = event_data['field']
              state[field_name] = event_data['value']
          elif event_type == 'FIELD_INCREMENTED':
              field_name = event_data['field']
              if field_name in state:
                  state[field_name] += event_data['amount']
              else:
                  state[field_name] = event_data['amount']
          # Add more event handlers as needed

      result = {
          'entity_id': '{{ inputs.entity_id }}',
          'as_of_time': '{{ inputs.as_of_time }}',
          'state': state,
          'events_replayed': len(events),
          'last_event_time': events[-1]['occurred_at'] if events else None
      }

      print(json.dumps(result, indent=2))

Pattern 5: CQRS (Command Query Responsibility Segregation)

Separating Read and Write Models

CQRS separates read and write operations for better scalability and optimization.

id: cqrs-user-profile
namespace: acme.social
description: CQRS pattern for user profiles

inputs:
  - name: command
    type: STRING
    required: true
    choices: ["update_profile", "get_profile", "search_profiles"]
  - name: user_id
    type: STRING
    required: false
  - name: profile_data
    type: JSON
    required: false

tasks:
  # Command Side: Handle writes
  - id: handle_command
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ inputs.command }}"
    cases:
      update_profile:
        - id: validate_profile_data
          type: io.kestra.plugin.core.condition.Expression
          expression: >
            {{ inputs.profile_data != null and 
               inputs.user_id != null }}
          errorMessage: "Missing profile data or user_id"

        - id: store_write_event
          type: io.kestra.plugin.jdbc.postgresql.Query
          sql: |
            INSERT INTO user_profile_events
            (event_id, user_id, event_type, event_data, occurred_at)
            VALUES (
              '{{ execution.id }}',
              '{{ inputs.user_id }}',
              'PROFILE_UPDATED',
              '{{ inputs.profile_data | tojson }}',
              NOW()
            )

        - id: update_read_model_async
          type: io.kestra.plugin.core.flow.Subflow
          namespace: acme.background
          flowId: update-profile-read-model
          wait: false  # Fire and forget - eventual consistency
          transmit:
            - name: user_id
              value: "{{ inputs.user_id }}"
            - name: profile_data
              value: "{{ inputs.profile_data | tojson }}"

        - id: invalidate_cache
          type: io.kestra.plugin.redis.Delete
          key: "profile:{{ inputs.user_id }}"
          host: "redis"
          port: 6379

        - id: send_profile_updated_event
          type: io.kestra.plugin.kafka.Produce
          topic: "user-profile-events"
          key: "{{ inputs.user_id }}"
          value: |
            {
              "event_type": "PROFILE_UPDATED",
              "user_id": "{{ inputs.user_id }}",
              "timestamp": "{{ now() }}",
              "event_id": "{{ execution.id }}"
            }

  # Query Side: Handle reads (optimized for reading)
  - id: handle_query
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ inputs.command }}"
    cases:
      get_profile:
        - id: try_cache_first
          type: io.kestra.plugin.redis.Get
          key: "profile:{{ inputs.user_id }}"
          host: "redis"
          port: 6379

        - id: cache_hit
          type: io.kestra.plugin.core.debug.Return
          format: "{{ outputs.try_cache_first.value }}"
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.try_cache_first.value != null }}"

        - id: cache_miss_query
          type: io.kestra.plugin.jdbc.postgresql.Query
          fetch: true
          sql: |
            SELECT 
              user_id,
              display_name,
              avatar_url,
              bio,
              location,
              website,
              follower_count,
              following_count,
              post_count,
              last_updated
            FROM user_profiles_read
            WHERE user_id = '{{ inputs.user_id }}'
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.try_cache_first.value == null }}"

        - id: populate_cache
          type: io.kestra.plugin.redis.Put
          key: "profile:{{ inputs.user_id }}"
          value: |
            {{ outputs.cache_miss_query.rows[0] | tojson }}
          ttl: PT1H
          host: "redis"
          port: 6379
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: >
                {{ outputs.cache_miss_query.rows | length > 0 }}

        - id: return_profile
          type: io.kestra.plugin.core.debug.Return
          format: >
            {{ outputs.cache_miss_query.rows[0] | tojson if outputs.cache_miss_query.rows else '{}' }}
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.try_cache_first.value == null }}"

      search_profiles:
        - id: search_read_model
          type: io.kestra.plugin.jdbc.elasticsearch.Search
          connection:
            hosts: ["elasticsearch:9200"]
          index: "user_profiles"
          query: |
            {
              "query": {
                "multi_match": {
                  "query": "{{ inputs.profile_data.query }}",
                  "fields": ["display_name^2", "bio", "location"]
                }
              },
              "size": 20
            }

        - id: return_search_results
          type: io.kestra.plugin.core.debug.Return
          format: >
            {{ outputs.search_read_model.hits | tojson }}

Read Model Update Processor

id: update-profile-read-model
namespace: acme.background
description: Background processor to update read models

tasks:
  - id: listen_for_profile_events
    type: io.kestra.plugin.kafka.Consume
    topic: "user-profile-events"
    groupId: "read-model-updater"
    maxRecords: 100
    timeout: PT10S

  - id: process_events_batch
    type: io.kestra.plugin.core.flow.EachSequential
    value: "{{ outputs.listen_for_profile_events.messages }}"
    tasks:
      - id: "process_event_{{ taskloop.index }}"
        type: io.kestra.plugin.core.flow.Switch
        value: "{{ task.value.value.event_type }}"
        cases:
          PROFILE_UPDATED:
            - id: update_read_model
              type: io.kestra.plugin.jdbc.postgresql.Query
              sql: |
                INSERT INTO user_profiles_read
                (user_id, display_name, avatar_url, bio, location, website, last_updated)
                VALUES (
                  '{{ task.value.value.user_id }}',
                  '{{ task.value.value.event_data.display_name | default('') }}',
                  '{{ task.value.value.event_data.avatar_url | default('') }}',
                  '{{ task.value.value.event_data.bio | default('') }}',
                  '{{ task.value.value.event_data.location | default('') }}',
                  '{{ task.value.value.event_data.website | default('') }}',
                  NOW()
                )
                ON CONFLICT (user_id) DO UPDATE SET
                  display_name = EXCLUDED.display_name,
                  avatar_url = EXCLUDED.avatar_url,
                  bio = EXCLUDED.bio,
                  location = EXCLUDED.location,
                  website = EXCLUDED.website,
                  last_updated = NOW()

            - id: update_search_index
              type: io.kestra.plugin.jdbc.elasticsearch.Index
              connection:
                hosts: ["elasticsearch:9200"]
              index: "user_profiles"
              id: "{{ task.value.value.user_id }}"
              document: |
                {
                  "user_id": "{{ task.value.value.user_id }}",
                  "display_name": "{{ task.value.value.event_data.display_name | default('') }}",
                  "bio": "{{ task.value.value.event_data.bio | default('') }}",
                  "location": "{{ task.value.value.event_data.location | default('') }}",
                  "last_updated": "{{ now() }}"
                }

          FOLLOW_ADDED:
            - id: increment_follower_count
              type: io.kestra.plugin.jdbc.postgresql.Query
              sql: |
                UPDATE user_profiles_read
                SET follower_count = follower_count + 1
                WHERE user_id = '{{ task.value.value.target_user_id }}'

            - id: increment_following_count
              type: io.kestra.plugin.jdbc.postgresql.Query
              sql: |
                UPDATE user_profiles_read
                SET following_count = following_count + 1
                WHERE user_id = '{{ task.value.value.source_user_id }}'

  - id: commit_kafka_offsets
    type: io.kestra.plugin.kafka.Consume
    topic: "user-profile-events"
    groupId: "read-model-updater"
    maxRecords: 0  # Just commit offsets
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.listen_for_profile_events.messages | length > 0 }}"

Pattern 6: Strangler Fig Pattern

Incremental Migration Strategy

The Strangler Fig pattern incrementally replaces a legacy system by building new functionality around it, then gradually migrating.

id: strangler-fig-migration
namespace: acme.migration
description: Incrementally migrate from legacy to new system

variables:
  migration_phase: "parallel_run"  # Options: shadow, parallel_run, cutover
  traffic_percentage: 50  # Percentage of traffic to send to new system

tasks:
  # Feature Flag: Route traffic based on migration phase
  - id: route_traffic
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ vars.migration_phase }}"
    cases:
      shadow:
        # Shadow mode: Send to both, compare results
        - id: call_legacy_system
          type: io.kestra.plugin.core.http.Request
          uri: "https://legacy-api.acme.com/process"
          method: POST
          body: "{{ inputs.request_body }}"
          store: true

        - id: call_new_system_shadow
          type: io.kestra.plugin.core.http.Request
          uri: "https://new-api.acme.com/process"
          method: POST
          body: "{{ inputs.request_body }}"
          store: true

        - id: compare_results
          type: io.kestra.plugin.scripts.python.Script
          inputFiles:
            legacy_response.json: "{{ outputs.call_legacy_system.uri }}"
            new_response.json: "{{ outputs.call_new_system_shadow.uri }}"
          script: |
            import json
            import difflib

            with open('legacy_response.json', 'r') as f:
                legacy = json.load(f)
            with open('new_response.json', 'r') as f:
                new = json.load(f)

            # Compare results
            comparison = {
                'match': legacy == new,
                'legacy_response': legacy,
                'new_response': new,
                'differences': []
            }

            if legacy != new:
                # Find specific differences
                legacy_str = json.dumps(legacy, sort_keys=True, indent=2)
                new_str = json.dumps(new, sort_keys=True, indent=2)

                diff = list(difflib.unified_diff(
                    legacy_str.splitlines(),
                    new_str.splitlines(),
                    lineterm=''
                ))

                comparison['differences'] = diff[:10]  # First 10 differences

            print(json.dumps(comparison, indent=2))

        - id: return_legacy_response
          type: io.kestra.plugin.core.debug.Return
          format: "{{ outputs.call_legacy_system.body | tojson }}"

      parallel_run:
        # Parallel run: Split traffic between systems
        - id: decide_routing
          type: io.kestra.plugin.core.debug.Return
          format: >
            {{ random() * 100 <= vars.traffic_percentage }}

        - id: call_new_system_parallel
          type: io.kestra.plugin.core.http.Request
          uri: "https://new-api.acme.com/process"
          method: POST
          body: "{{ inputs.request_body }}"
          store: true
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.decide_routing.value == true }}"

        - id: call_legacy_system_fallback
          type: io.kestra.plugin.core.http.Request
          uri: "https://legacy-api.acme.com/process"
          method: POST
          body: "{{ inputs.request_body }}"
          store: true
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.decide_routing.value == false }}"

        - id: return_response
          type: io.kestra.plugin.core.flow.Switch
          value: "{{ outputs.decide_routing.value }}"
          cases:
            true:
              - id: return_new_response
                type: io.kestra.plugin.core.debug.Return
                format: "{{ outputs.call_new_system_parallel.body | tojson }}"
            false:
              - id: return_legacy_response_fallback
                type: io.kestra.plugin.core.debug.Return
                format: "{{ outputs.call_legacy_system_fallback.body | tojson }}"

      cutover:
        # Cutover: All traffic to new system
        - id: call_new_system_primary
          type: io.kestra.plugin.core.http.Request
          uri: "https://new-api.acme.com/process"
          method: POST
          body: "{{ inputs.request_body }}"
          store: true

        - id: return_new_response_primary
          type: io.kestra.plugin.core.debug.Return
          format: "{{ outputs.call_new_system_primary.body | tojson }}"

        - id: legacy_system_backup
          type: io.kestra.plugin.core.http.Request
          uri: "https://legacy-api.acme.com/process"
          method: POST
          body: "{{ inputs.request_body }}"
          store: true
          allowFailure: true  # Legacy might be turned off

  # Migration Monitoring
  - id: track_migration_metrics
    type: io.kestra.plugin.jdbc.postgresql.Query
    sql: |
      INSERT INTO migration_metrics
      (migration_phase, traffic_percentage, request_count, 
       success_count, error_count, avg_response_time, timestamp)
      SELECT 
        '{{ vars.migration_phase }}',
        {{ vars.traffic_percentage }},
        COUNT(*) as request_count,
        SUM(CASE WHEN status_code = 200 THEN 1 ELSE 0 END) as success_count,
        SUM(CASE WHEN status_code != 200 THEN 1 ELSE 0 END) as error_count,
        AVG(response_time_ms) as avg_response_time,
        NOW()
      FROM api_requests
      WHERE timestamp >= NOW() - INTERVAL '1 minute'
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ vars.migration_phase != 'completed' }}"

Data Migration with Dual Write

id: dual-write-migration
namespace: acme.migration
description: Dual write to both old and new databases during migration

tasks:
  - id: write_to_both_systems
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: write_to_legacy
        type: io.kestra.plugin.jdbc.postgresql.Query
        url: "jdbc:postgresql://legacy-db.acme.com:5432/app"
        username: "{{ secret('LEGACY_DB_USER') }}"
        password: "{{ secret('LEGACY_DB_PASSWORD') }}"
        sql: |
          INSERT INTO orders 
          (order_id, customer_id, amount, status, created_at)
          VALUES (
            '{{ inputs.order_id }}',
            '{{ inputs.customer_id }}',
            {{ inputs.amount }},
            'pending',
            NOW()
          )
        allowFailure: false

      - id: write_to_new
        type: io.kestra.plugin.jdbc.postgresql.Query
        url: "jdbc:postgresql://new-db.acme.com:5432/app"
        username: "{{ secret('NEW_DB_USER') }}"
        password: "{{ secret('NEW_DB_PASSWORD') }}"
        sql: |
          INSERT INTO orders 
          (order_id, customer_id, amount, status, created_at, metadata)
          VALUES (
            '{{ inputs.order_id }}',
            '{{ inputs.customer_id }}',
            {{ inputs.amount }},
            'pending',
            NOW(),
            '{"migration_phase": "dual_write"}'
          )
        allowFailure: true  # New system might have issues

  - id: verify_data_consistency
    type: io.kestra.plugin.scripts.python.Script
    description: "Verify data was written to both systems consistently"
    script: |
      import psycopg2
      import json

      def query_db(host, dbname, user, password, query):
          conn = psycopg2.connect(
              host=host, dbname=dbname, user=user, password=password
          )
          cur = conn.cursor()
          cur.execute(query)
          result = cur.fetchone()
          cur.close()
          conn.close()
          return result

      # Query both systems
      legacy_query = f"""
          SELECT order_id, customer_id, amount, status
          FROM orders
          WHERE order_id = '{ {{ inputs.order_id }} }'
      """

      new_query = f"""
          SELECT order_id, customer_id, amount, status
          FROM orders
          WHERE order_id = '{ {{ inputs.order_id }} }'
      """

      try:
          legacy_result = query_db(
              'legacy-db.acme.com', 'app',
              '{{ secret('LEGACY_DB_USER') }}', '{{ secret('LEGACY_DB_PASSWORD') }}',
              legacy_query
          )
      except Exception as e:
          legacy_result = None
          print(f"Legacy query failed: {e}")

      try:
          new_result = query_db(
              'new-db.acme.com', 'app',
              '{{ secret('NEW_DB_USER') }}', '{{ secret('NEW_DB_PASSWORD') }}',
              new_query
          )
      except Exception as e:
          new_result = None
          print(f"New query failed: {e}")

      # Compare results
      comparison = {
          'order_id': '{{ inputs.order_id }}',
          'legacy_found': legacy_result is not None,
          'new_found': new_result is not None,
          'consistent': legacy_result == new_result,
          'legacy_data': legacy_result,
          'new_data': new_result
      }

      print(json.dumps(comparison, indent=2))

      if not comparison['consistent']:
          # Log inconsistency for manual review
          print("WARNING: Data inconsistency detected!")

  - id: handle_inconsistency
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.reconciliation
    flowId: reconcile-dual-write
    wait: false  # Async reconciliation
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: >
          {{ outputs.verify_data_consistency.vars.consistent == false }}

Pattern 7: Sidecar Pattern for Cross-Cutting Concerns

Implementing Cross-Cutting Logic

The sidecar pattern attaches supporting functionality to the main workflow without modifying its core logic.

id: main-business-workflow
namespace: acme.business
description: Main business logic with sidecar attachments

tasks:
  - id: business_logic
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: step1
        type: io.kestra.plugin.core.log.Log
        message: "Business step 1"

      - id: step2
        type: io.kestra.plugin.core.debug.Return
        format: "Business step 2 result"

      - id: step3
        type: io.kestra.plugin.core.log.Log
        message: "Business step 3 with result: {{ outputs.step2.value }}"

# Sidecar flow that can be attached to any workflow
id: monitoring-sidecar
namespace: acme.sidecars
description: Attachable monitoring and observability sidecar

tasks:
  - id: start_monitoring
    type: io.kestra.plugin.core.debug.Return
    format: "MONITORING_START:{{ parent.execution.id }}"

  - id: collect_metrics
    type: io.kestra.plugin.core.http.Request
    uri: "http://metrics-collector.acme.com/api/metrics"
    method: POST
    body: |
      {
        "execution_id": "{{ parent.execution.id }}",
        "flow_id": "{{ parent.flow.id }}",
        "namespace": "{{ parent.namespace.id }}",
        "start_time": "{{ parent.execution.startDate }}",
        "metrics": {
          "task_count": {{ parent.tasks | length }},
          "inputs": {{ parent.inputs | default({}) | tojson }}
        }
      }
    store: true

  - id: log_performance
    type: io.kestra.plugin.core.log.Log
    message: |
      Sidecar monitoring for {{ parent.execution.id }}
      Flow: {{ parent.flow.id }}
      Tasks monitored: {{ parent.tasks | length }}

Composed Workflow with Sidecars

id: composed-workflow-with-sidecars
namespace: acme.composed
description: Compose main workflow with multiple sidecars

tasks:
  # Attach monitoring sidecar
  - id: attach_monitoring
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sidecars
    flowId: monitoring-sidecar
    wait: false  # Run alongside

  # Attach auditing sidecar
  - id: attach_auditing
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sidecars
    flowId: auditing-sidecar
    wait: false

  # Main business logic
  - id: main_business_process
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.business
    flowId: main-business-workflow
    wait: true

  # Attach cleanup sidecar (runs after main process)
  - id: attach_cleanup
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sidecars
    flowId: cleanup-sidecar
    wait: false
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: >
          {{ outputs.main_business_process.state.current in ['SUCCESS', 'FAILED'] }}

Pattern 8: Pipeline Pattern with Middleware

Building Processing Pipelines

id: data-processing-pipeline
namespace: acme.pipelines
description: Configurable pipeline with middleware support

variables:
  pipeline_stages: ["validate", "transform", "enrich", "filter", "output"]
  middleware_enabled: ["logging", "metrics", "validation", "caching"]

tasks:
  # Pipeline orchestrator
  - id: pipeline_orchestrator
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: load_data
        type: io.kestra.plugin.core.http.Download
        uri: "https://api.acme.com/data"
        store: true

      # Dynamic pipeline stages
      - id: execute_pipeline_stages
        type: io.kestra.plugin.core.flow.EachSequential
        value: "{{ vars.pipeline_stages }}"
        tasks:
          - id: "execute_{{ task.value }}"
            type: io.kestra.plugin.core.flow.Switch
            value: "{{ task.value }}"
            cases:
              validate:
                - id: run_validation
                  type: io.kestra.plugin.core.flow.Subflow
                  namespace: acme.pipeline.middleware
                  flowId: validation-middleware
                  wait: true
                  transmit:
                    - name: data
                      value: "{{ outputs.load_data.uri }}"

              transform:
                - id: run_transformation
                  type: io.kestra.plugin.scripts.python.Script
                  inputFiles:
                    data.json: "{{ outputs.execute_validation.outputs.validated_data | default(outputs.load_data.uri) }}"
                  script: |
                    import json
                    import pandas as pd

                    with open('data.json', 'r') as f:
                        data = json.load(f)

                    # Transformation logic
                    df = pd.DataFrame(data)
                    # ... transformation steps

                    df.to_parquet('transformed.parquet', index=False)
                    print("Transformation complete")

              enrich:
                - id: run_enrichment
                  type: io.kestra.plugin.core.flow.Subflow
                  namespace: acme.pipeline.middleware
                  flowId: enrichment-middleware
                  wait: true

              filter:
                - id: run_filtering
                  type: io.kestra.plugin.scripts.python.Script
                  script: |
                    # Filtering logic
                    print("Filtering complete")

              output:
                - id: produce_output
                  type: io.kestra.plugin.core.debug.Return
                  format: "Pipeline execution complete"

          # Apply middleware after each stage
          - id: "apply_middleware_{{ task.value }}"
            type: io.kestra.plugin.core.flow.Parallel
            tasks:
              - id: "logging_middleware_{{ task.value }}"
                type: io.kestra.plugin.core.flow.Subflow
                namespace: acme.pipeline.middleware
                flowId: logging-middleware
                wait: false
                transmit:
                  - name: stage
                    value: "{{ task.value }}"
                  - name: execution_id
                    value: "{{ execution.id }}"
                conditions:
                  - type: io.kestra.plugin.core.condition.Expression
                    expression: "'logging' in vars.middleware_enabled"

              - id: "metrics_middleware_{{ task.value }}"
                type: io.kestra.plugin.core.flow.Subflow
                namespace: acme.pipeline.middleware
                flowId: metrics-middleware
                wait: false
                conditions:
                  - type: io.kestra.plugin.core.condition.Expression
                    expression: "'metrics' in vars.middleware_enabled"

Middleware Implementation

id: logging-middleware
namespace: acme.pipeline.middleware
description: Logging middleware for pipeline stages

tasks:
  - id: log_stage_execution
    type: io.kestra.plugin.jdbc.postgresql.Query
    sql: |
      INSERT INTO pipeline_logs
      (execution_id, stage, status, start_time, end_time, duration_ms)
      VALUES (
        '{{ inputs.execution_id }}',
        '{{ inputs.stage }}',
        '{{ parent.state.current }}',
        '{{ parent.startDate }}',
        '{{ now() }}',
        EXTRACT(EPOCH FROM (NOW() - '{{ parent.startDate }}')) * 1000
      )

Pattern 9: Bulkhead Pattern for Fault Isolation

Isolating Failures with Resource Pools

id: bulkhead-pattern
namespace: acme.resilience
description: Isolate failures using resource pools

variables:
  # Different resource pools for different services
  resource_pools:
    payment_service:
      max_concurrent: 5
      timeout: PT30S
    inventory_service:
      max_concurrent: 3
      timeout: PT20S
    shipping_service:
      max_concurrent: 2
      timeout: PT60S

tasks:
  # Resource pool manager
  - id: acquire_resource_slot
    type: io.kestra.plugin.core.cache.Increment
    key: "resource_pool:{{ inputs.service_name }}:active"
    defaultValue: 0
    ttl: PT5M

  - id: check_pool_capacity
    type: io.kestra.plugin.core.condition.Expression
    expression: >
      {{ outputs.acquire_resource_slot.value <= 
         vars.resource_pools[inputs.service_name].max_concurrent }}

  - id: execute_with_resource_limit
    type: io.kestra.plugin.core.flow.Switch
    value: "{{ inputs.service_name }}"
    cases:
      payment_service:
        - id: call_payment_service
          type: io.kestra.plugin.core.http.Request
          uri: "https://payments.acme.com/api/process"
          method: POST
          body: "{{ inputs.request_data }}"
          timeout: "{{ vars.resource_pools.payment_service.timeout }}"
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.check_pool_capacity.value }}"

      inventory_service:
        - id: call_inventory_service
          type: io.kestra.plugin.core.http.Request
          uri: "https://inventory.acme.com/api/check"
          method: POST
          body: "{{ inputs.request_data }}"
          timeout: "{{ vars.resource_pools.inventory_service.timeout }}"
          conditions:
            - type: io.kestra.plugin.core.condition.Expression
              expression: "{{ outputs.check_pool_capacity.value }}"

  - id: pool_capacity_exceeded
    type: io.kestra.plugin.core.debug.Return
    format: "Resource pool capacity exceeded for {{ inputs.service_name }}"
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ not outputs.check_pool_capacity.value }}"

  # Release resource slot
  - id: release_resource_slot
    type: io.kestra.plugin.core.cache.Decrement
    key: "resource_pool:{{ inputs.service_name }}:active"
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: >
          {{ outputs.execute_with_resource_limit != null or
             outputs.pool_capacity_exceeded != null }}

Pattern 10: Ambassador Pattern for Service Abstraction

Abstracting External Services

id: payment-service-ambassador
namespace: acme.ambassadors
description: Ambassador pattern for payment service abstraction

tasks:
  # Service discovery and health check
  - id: discover_payment_endpoints
    type: io.kestra.plugin.core.http.Request
    uri: "https://service-discovery.acme.com/api/endpoints/payment"
    method: GET
    store: true

  - id: select_healthy_endpoint
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      endpoints.json: "{{ outputs.discover_payment_endpoints.uri }}"
    script: |
      import json
      import requests
      from concurrent.futures import ThreadPoolExecutor, as_completed

      with open('endpoints.json', 'r') as f:
          endpoints = json.load(f)

      def check_endpoint_health(endpoint):
          try:
              response = requests.get(
                  f"{endpoint['url']}/health",
                  timeout=2
              )
              return {
                  'endpoint': endpoint,
                  'healthy': response.status_code == 200,
                  'response_time': response.elapsed.total_seconds()
              }
          except:
              return {
                  'endpoint': endpoint,
                  'healthy': False,
                  'response_time': float('inf')
              }

      # Check all endpoints in parallel
      with ThreadPoolExecutor(max_workers=5) as executor:
          futures = [executor.submit(check_endpoint_health, ep) for ep in endpoints]
          results = [f.result() for f in as_completed(futures)]

      # Select the healthiest endpoint
      healthy_endpoints = [r for r in results if r['healthy']]
      if healthy_endpoints:
          # Choose the fastest healthy endpoint
          selected = min(healthy_endpoints, key=lambda x: x['response_time'])
          print(json.dumps(selected['endpoint']))
      else:
          raise Exception("No healthy payment endpoints available")

  # Circuit breaker and retry logic
  - id: call_payment_service_with_retry
    type: io.kestra.plugin.core.http.Request
    uri: "{{ outputs.select_healthy_endpoint.vars.url }}/api/process"
    method: POST
    body: "{{ inputs.payment_data }}"
    retry:
      type: exponential
      maxAttempt: 3
      delay: PT1S
    timeout: PT10S

  # Response transformation
  - id: transform_payment_response
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      response.json: "{{ outputs.call_payment_service_with_retry.uri }}"
    script: |
      import json

      with open('response.json', 'r') as f:
          response = json.load(f)

      # Standardize response format
      standardized = {
          'success': response.get('status') == 'success',
          'transaction_id': response.get('id'),
          'amount': response.get('amount'),
          'currency': response.get('currency', 'USD'),
          'timestamp': response.get('created'),
          'provider': 'payment_service',
          'metadata': {
              'original_response': response
          }
      }

      print(json.dumps(standardized, indent=2))

  # Cache successful responses
  - id: cache_payment_response
    type: io.kestra.plugin.redis.Put
    key: "payment:{{ inputs.payment_data.transaction_id }}"
    value: "{{ outputs.transform_payment_response.vars }}"
    ttl: PT5M
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.transform_payment_response.vars.success == true }}"

Putting It All Together: A Complete Advanced Workflow

id: advanced-order-processing
namespace: acme.production
description: Complete advanced workflow combining multiple patterns

tasks:
  # Pattern 1: Circuit Breaker for external services
  - id: check_service_health
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: check_payment_service
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.resilience
        flowId: circuit-breaker-check
        wait: true
        transmit:
          - name: service_name
            value: "payment_service"

      - id: check_inventory_service
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.resilience
        flowId: circuit-breaker-check
        wait: true
        transmit:
          - name: service_name
            value: "inventory_service"

  # Pattern 2: Bulkhead for resource isolation
  - id: process_with_bulkheads
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: validate_order_bulkhead
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.resilience
        flowId: bulkhead-processor
        wait: true
        transmit:
          - name: service_name
            value: "validation"
          - name: request_data
            value: "{{ inputs.order_data }}"

      - id: reserve_inventory_bulkhead
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.resilience
        flowId: bulkhead-processor
        wait: true
        transmit:
          - name: service_name
            value: "inventory"
          - name: request_data
            value: "{{ inputs.order_data }}"
        conditions:
          - type: io.kestra.plugin.core.condition.Expression
            expression: >
              {{ outputs.check_service_health.outputs.check_inventory_service.outputs.circuit_state != 'open' }}

  # Pattern 3: Saga pattern for distributed transaction
  - id: execute_order_saga
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: saga_step1_payment
        type: io.kestra.plugin.core.flow.Subflow
        namespace: acme.ambassadors
        flowId: payment-service-ambassador
        wait: true
        transmit:
          - name: payment_data
            value: "{{ inputs.payment_data }}"

      - id: saga_step2_fulfillment
        type: io.kestra.plugin.core.http.Request
        uri: "https://fulfillment.acme.com/api/process"
        method: POST
        body: |
          {
            "order_id": "{{ execution.id }}",
            "items": {{ inputs.order_data.items | tojson }},
            "payment_reference": "{{ outputs.saga_step1_payment.outputs.transaction_id }}"
          }

      - id: saga_step3_notification
        type: io.kestra.plugin.core.flow.Parallel
        tasks:
          - id: notify_customer
            type: io.kestra.plugin.notifications.email.EmailSend
            to: "{{ inputs.customer_email }}"
            subject: "Order Confirmation #{{ execution.id }}"
            htmlBody: |
              <h1>Thank you for your order!</h1>
              <p>Order ID: {{ execution.id }}</p>

          - id: notify_internal
            type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
            url: "{{ secret('SLACK_ORDERS_WEBHOOK') }}"
            payload: |
              {
                "text": "New order processed: {{ execution.id }}"
              }

  # Pattern 4: Event sourcing for audit trail
  - id: record_order_events
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: record_order_created
        type: io.kestra.plugin.jdbc.postgresql.Query
        sql: |
          INSERT INTO order_events
          (event_id, order_id, event_type, event_data, occurred_at)
          VALUES (
            '{{ execution.id }}_created',
            '{{ execution.id }}',
            'ORDER_CREATED',
            '{{ inputs.order_data | tojson }}',
            NOW()
          )

      - id: record_order_processing
        type: io.kestra.plugin.jdbc.postgresql.Query
        sql: |
          INSERT INTO order_events
          (event_id, order_id, event_type, event_data, occurred_at)
          VALUES (
            '{{ execution.id }}_processing',
            '{{ execution.id }}',
            'ORDER_PROCESSING',
            '{{ outputs.execute_order_saga | tojson }}',
            NOW()
          )

  # Pattern 5: CQRS update for read models
  - id: update_read_models
    type: io.kestra.plugin.core.flow.Parallel
    wait: false  # Async updates
    tasks:
      - id: update_order_summary
        type: io.kestra.plugin.jdbc.postgresql.Query
        sql: |
          INSERT INTO order_summaries
          (order_id, customer_id, total_amount, status, items_count, created_at)
          VALUES (
            '{{ execution.id }}',
            '{{ inputs.customer_id }}',
            {{ inputs.order_data.total }},
            'processed',
            {{ inputs.order_data.items | length }},
            NOW()
          )

      - id: update_customer_metrics
        type: io.kestra.plugin.jdbc.postgresql.Query
        sql: |
          UPDATE customer_metrics
          SET 
            total_orders = total_orders + 1,
            total_spent = total_spent + {{ inputs.order_data.total }},
            last_order_at = NOW()
          WHERE customer_id = '{{ inputs.customer_id }}'

  # Pattern 6: Sidecar for monitoring
  - id: attach_monitoring_sidecar
    type: io.kestra.plugin.core.flow.Subflow
    namespace: acme.sidecars
    flowId: monitoring-sidecar
    wait: false

Best Practices for Advanced Patterns

1. Pattern Selection Guide

  • For fault tolerance: Circuit Breaker, Bulkhead, Retry

  • For distributed transactions: Saga, Event Sourcing

  • For migration: Strangler Fig, Dual Write

  • For scalability: CQRS, Sharding, Partitioning

  • For observability: Sidecar, Ambassador

2. Testing Advanced Patterns

# Test circuit breaker
id: test-circuit-breaker
namespace: acme.tests

tasks:
  - id: simulate_failures
    type: io.kestra.plugin.core.flow.EachSequential
    value: "{{ range(1, 6) }}"
    tasks:
      - id: call_failing_service
        type: io.kestra.plugin.core.http.Request
        uri: "http://mock-service/fail"
        allowFailure: true

  - id: verify_circuit_tripped
    type: io.kestra.plugin.core.cache.Get
    key: "circuit_breaker:mock-service"

  - id: assert_circuit_open
    type: io.kestra.plugin.core.condition.Expression
    expression: '{{ outputs.verify_circuit_tripped.value.state == "open" }}'

3. Monitoring and Observability

id: pattern-metrics-collector
namespace: acme.monitoring

tasks:
  - id: collect_pattern_metrics
    type: io.kestra.plugin.scripts.python.Script
    script: |
      # Collect metrics for each pattern
      metrics = {
          'circuit_breaker': {
              'open_circuits': count_open_circuits(),
              'tripped_today': count_trips_today()
          },
          'saga': {
              'active_sagas': count_active_sagas(),
              'compensation_rate': calculate_compensation_rate()
          },
          'cqrs': {
              'read_lag_ms': calculate_read_lag(),
              'write_throughput': calculate_write_throughput()
          }
      }

4. Documentation Strategy

# Annotate patterns in flow metadata
labels:
  patterns: "circuit-breaker,saga,cqrs"
  complexity: "advanced"
  team: "platform-engineering"

documentation: |
  ## Applied Patterns

  ### Circuit Breaker
  - Prevents cascading failures from payment service
  - Config: 5 failures trips circuit, 5 minute timeout

  ### Saga Pattern  
  - Manages distributed order processing
  - Compensation: reverse payment, release inventory

  ### CQRS
  - Separate read/write models for orders
  - Eventual consistency: < 1 second lag

Common Anti-Patterns to Avoid

1. Over-Engineering

# BAD: Using saga for simple local transaction
# GOOD: Use simple sequential tasks for local operations

2. Pattern Misapplication

# BAD: Using event sourcing for simple CRUD
# GOOD: Event sourcing for audit-critical domains only

3. Ignoring Consistency Boundaries

# BAD: Distributed transaction across 10 microservices
# GOOD: Saga within bounded context, eventual consistency between

4. Neglecting Monitoring

# BAD: Complex patterns without observability
# GOOD: Comprehensive metrics for each pattern

Conclusion: Becoming a Workflow Architect

You've now explored the most powerful patterns in workflow orchestration. Remember:

  1. Patterns are tools, not goals - Use them to solve specific problems

  2. Start simple, add complexity gradually - Don't over-engineer from day one

  3. Monitor everything - Complex patterns need comprehensive observability

  4. Document decisions - Future you (and your team) will thank you

  5. Test thoroughly - Edge cases multiply with complexity

Your Advanced Challenge:

Pick one complex business problem and solve it with multiple patterns:

  1. E-commerce checkout with inventory, payment, and fulfillment

  2. Data pipeline with validation, transformation, and quality checks

  3. User onboarding with verification, provisioning, and welcome sequence

  4. Report generation with data collection, processing, and distribution

  5. System migration from monolith to microservices

Share your architecture in the comments!


In the next article, we'll explore Kestra's Plugin Ecosystem - how to extend Kestra with custom functionality and integrate with any system.

Before the next article, try:

  1. Implement one new pattern in an existing workflow

  2. Create a reusable pattern template for your team

  3. Set up monitoring for pattern-specific metrics

  4. Refactor a complex workflow using appropriate patterns

  5. Document your pattern decisions and trade-offs

Remember: Mastery comes from practice, not just knowledge. Build, iterate, and learn from each implementation.

Happy orchestrating! 🚀