Article 5: Advanced Workflow Patterns in Kestra.
Learner, Love to make things simple, Full Stack Developer, StackOverflower, Passionate about using machine learning, deep learning and AI
Mastering Complex Orchestration Scenarios.
Introduction: The Orchestrator's Toolkit
Imagine you're conducting a symphony. You don't just wave your baton - you cue sections, adjust tempo, handle surprises, and ensure harmony. That's what advanced workflow patterns give you in Kestra: the ability to conduct complex data orchestrations with precision and grace.
In this article, we'll move beyond simple linear pipelines into sophisticated orchestration patterns that handle real-world complexity. You'll learn patterns used by top data teams to build resilient, scalable, and intelligent workflows.
What makes a workflow "advanced"?
Intelligence: Making decisions based on data
Resilience: Graceful handling of the unexpected
Efficiency: Optimal resource usage and parallelism
Adaptability: Dynamic behavior based on context
Maintainability: Clean, reusable patterns
Pattern 1: Dynamic Parallel Processing
Beyond Simple Parallel Tasks
While Kestra's Parallel task is great for fixed concurrency, real-world scenarios often require dynamic parallelism based on data volume or external factors.
id: dynamic-parallel-processing
namespace: acme.advanced
description: Dynamic parallelism based on data volume
tasks:
- id: discover_files
type: io.kestra.plugin.core.http.Request
uri: "https://api.acme.com/files"
store: true
- id: calculate_optimal_parallelism
type: io.kestra.plugin.scripts.python.Script
inputFiles:
files.json: "{{ outputs.discover_files.uri }}"
script: |
import json
import math
with open('files.json', 'r') as f:
files = json.load(f)
total_files = len(files['files'])
avg_file_size = sum(f['size'] for f in files['files']) / total_files if total_files > 0 else 0
# Dynamic parallelism calculation
# Rule: 1 worker per 100MB, max 10 workers
max_workers = min(10, max(1, math.ceil((total_files * avg_file_size) / (100 * 1024 * 1024))))
# Split files into batches
batch_size = math.ceil(total_files / max_workers)
batches = []
for i in range(0, total_files, batch_size):
batches.append(files['files'][i:i + batch_size])
print(json.dumps({
'max_workers': max_workers,
'batch_size': batch_size,
'batches': batches,
'total_files': total_files
}))
- id: process_in_dynamic_parallel
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ outputs.calculate_optimal_parallelism.vars.batches }}"
maxParallel: "{{ outputs.calculate_optimal_parallelism.vars.max_workers }}"
tasks:
- id: "process_batch_{{ taskloop.index }}"
type: io.kestra.plugin.scripts.python.Script
description: "Processing batch {{ taskloop.index }} with {{ task.value | length }} files"
inputFiles:
batch_files.json: |
{{ task.value | tojson }}
script: |
import json
import time
with open('batch_files.json', 'r') as f:
files = json.load(f)
# Simulate processing
results = []
for file in files:
time.sleep(0.1) # Simulate work
results.append({
'file': file['name'],
'processed': True,
'size': file['size']
})
print(f"Processed {len(results)} files")
print(json.dumps({'results': results}))
Fan-Out/Fan-In Pattern
This pattern distributes work across many workers (fan-out) and aggregates results (fan-in).
id: fan-out-fan-in-pattern
namespace: acme.advanced
description: Distribute work, process in parallel, aggregate results
tasks:
# Fan-Out: Distribute work
- id: generate_work_items
type: io.kestra.plugin.core.debug.Return
format: >
{{ range(1, 101) | map('toString') | list | tojson }}
# Parallel Processing
- id: parallel_processing
type: io.kestra.plugin.core.flow.EachParallel
value: "{{ outputs.generate_work_items.value }}"
maxParallel: 20
tasks:
- id: "process_item_{{ task.value }}"
type: io.kestra.plugin.scripts.python.Script
script: |
import time
import random
# Simulate variable processing time
processing_time = random.uniform(0.1, 2.0)
time.sleep(processing_time)
# Generate result
result = {
'item': {{ task.value }},
'processed_at': time.time(),
'processing_time': processing_time,
'result': {{ task.value }} * 2 # Simple transformation
}
print(f"Processed item {{ task.value }} in {processing_time:.2f}s")
# Return result
import json
print(json.dumps(result))
# Fan-In: Aggregate results
- id: aggregate_results
type: io.kestra.plugin.scripts.python.Script
description: "Aggregate all parallel results"
script: |
import json
import statistics
# Collect all results from parallel tasks
all_results = []
processing_times = []
{% for i in range(1, 101) %}
try:
result_json = '''{{ outputs.parallel_processing.outputs["process_item_" + i|string].vars | default('{}') | tojson }}'''
if result_json:
result = json.loads(result_json)
if 'result' in str(result): # Check if result exists
# Extract the actual result from the output
# This depends on how your script outputs data
all_results.append(result)
if 'processing_time' in result:
processing_times.append(result['processing_time'])
except:
pass
{% endfor %}
# Calculate statistics
stats = {
'total_processed': len(all_results),
'average_processing_time': statistics.mean(processing_times) if processing_times else 0,
'total_processing_time': sum(processing_times),
'throughput': len(all_results) / (sum(processing_times) if processing_times else 1),
'results_summary': {
'min': min(r['result'] for r in all_results) if all_results else 0,
'max': max(r['result'] for r in all_results) if all_results else 0,
'avg': statistics.mean(r['result'] for r in all_results) if all_results else 0
}
}
print(json.dumps(stats, indent=2))
dependsOn:
- parallel_processing
Pattern 2: Circuit Breaker Pattern
Preventing Cascading Failures
The circuit breaker pattern prevents systems from being overwhelmed by repeated failures, similar to an electrical circuit breaker.
id: circuit-breaker-pattern
namespace: acme.advanced
description: Implement circuit breaker for unreliable external services
variables:
circuit_breaker_state_key: "circuit_breaker:api.acme.com"
failure_threshold: 5
reset_timeout: PT5M
tasks:
- id: check_circuit_breaker
type: io.kestra.plugin.core.cache.Get
key: "{{ vars.circuit_breaker_state_key }}"
- id: evaluate_circuit_state
type: io.kestra.plugin.core.flow.Switch
value: "{{ outputs.check_circuit_breaker.value.state | default('closed') }}"
cases:
open:
- id: circuit_open_check_timeout
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ now().timestamp() - outputs.check_circuit_breaker.value.timestamp > 300 }}
- id: attempt_reset
type: io.kestra.plugin.core.log.Log
message: "Circuit breaker open for {{ vars.reset_timeout }}. Testing if service recovered..."
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.circuit_open_check_timeout.value }}"
- id: skip_service_call
type: io.kestra.plugin.core.log.Log
message: "Circuit breaker is OPEN - skipping service call"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ not outputs.circuit_open_check_timeout.value }}"
- id: call_external_service
type: io.kestra.plugin.core.http.Request
uri: "https://api.acme.com/unstable-endpoint"
method: GET
retry:
type: exponential
maxAttempt: 2
delay: PT5S
timeout: PT30S
allowFailure: true
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.check_circuit_breaker.value.state != 'open' or
outputs.circuit_open_check_timeout.value == true }}
- id: update_circuit_state
type: io.kestra.plugin.core.flow.Switch
value: "{{ outputs.call_external_service.state.current }}"
cases:
SUCCESS:
- id: reset_circuit_on_success
type: io.kestra.plugin.core.cache.Put
key: "{{ vars.circuit_breaker_state_key }}"
value: |
{
"state": "closed",
"failure_count": 0,
"last_success": "{{ now() }}",
"timestamp": {{ now().timestamp() }}
}
ttl: PT1H
FAILED:
- id: get_current_failure_count
type: io.kestra.plugin.core.cache.Get
key: "{{ vars.circuit_breaker_state_key }}_failures"
defaultValue: "0"
- id: increment_failure_count
type: io.kestra.plugin.core.cache.Put
key: "{{ vars.circuit_breaker_state_key }}_failures"
value: "{{ outputs.get_current_failure_count.value | int + 1 }}"
ttl: PT1H
- id: check_if_should_trip
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ (outputs.get_current_failure_count.value | int + 1) >= vars.failure_threshold }}
- id: trip_circuit_breaker
type: io.kestra.plugin.core.cache.Put
key: "{{ vars.circuit_breaker_state_key }}"
value: |
{
"state": "open",
"tripped_at": "{{ now() }}",
"failure_count": {{ outputs.get_current_failure_count.value | int + 1 }},
"timestamp": {{ now().timestamp() }}
}
ttl: "{{ vars.reset_timeout }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.check_if_should_trip.value }}"
- id: log_failure
type: io.kestra.plugin.core.log.Log
message: >
Service call failed. Failure count: {{ outputs.get_current_failure_count.value | int + 1 }}/{{ vars.failure_threshold }}
{% if outputs.check_if_should_trip.value %}
Circuit breaker TRIPPED to OPEN state
{% endif %}
- id: fallback_operation
type: io.kestra.plugin.core.debug.Return
format: "Using cached data or alternative service"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.call_external_service.state.current == 'FAILED' or
(outputs.check_circuit_breaker.value.state == 'open' and
not outputs.circuit_open_check_timeout.value) }}
Advanced Circuit Breaker with Monitoring
id: monitored-circuit-breaker
namespace: acme.advanced
tasks:
- id: circuit_breaker_monitor
type: io.kestra.plugin.scripts.python.Script
script: |
import redis
import json
from datetime import datetime
# Connect to shared state store
r = redis.Redis(host='redis', port=6379, decode_responses=True)
# Get all circuit breaker states
circuit_keys = r.keys("circuit_breaker:*")
circuits = []
for key in circuit_keys:
state = r.get(key)
if state:
circuits.append({
'service': key.replace('circuit_breaker:', ''),
'state': json.loads(state),
'age_seconds': datetime.now().timestamp() - json.loads(state).get('timestamp', 0)
})
# Generate health report
report = {
'timestamp': datetime.now().isoformat(),
'total_circuits': len(circuits),
'open_circuits': [c for c in circuits if c['state'].get('state') == 'open'],
'half_open_circuits': [c for c in circuits if c['state'].get('state') == 'half_open'],
'closed_circuits': [c for c in circuits if c['state'].get('state') == 'closed'],
'health_score': len([c for c in circuits if c['state'].get('state') == 'closed']) / len(circuits) if circuits else 1.0
}
# Alert if health score drops below threshold
if report['health_score'] < 0.8:
print("ALERT: Circuit breaker health score below 80%")
print(json.dumps(report, indent=2))
- id: send_circuit_health_alert
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK_URL') }}"
payload: |
{
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "âš¡ Circuit Breaker Health Report"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Health Score:* {{ outputs.circuit_breaker_monitor.vars.health_score | round(2) * 100 }}%\n*Open Circuits:* {{ outputs.circuit_breaker_monitor.vars.open_circuits | length }}\n*Total Circuits:* {{ outputs.circuit_breaker_monitor.vars.total_circuits }}"
}
}
]
}
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.circuit_breaker_monitor.vars.health_score < 0.8 }}"
Pattern 3: Saga Pattern for Distributed Transactions
Implementing Compensating Transactions
The Saga pattern manages distributed transactions by breaking them into a sequence of local transactions, each with a compensating action.
id: saga-pattern-order-processing
namespace: acme.ecommerce
description: Distributed transaction with compensation logic
inputs:
- name: order_data
type: JSON
required: true
tasks:
# SAGA: Main transaction sequence
- id: saga_orchestrator
type: io.kestra.plugin.core.flow.Sequential
tasks:
# Step 1: Reserve inventory (with compensation)
- id: reserve_inventory
type: io.kestra.plugin.core.http.Request
uri: "https://inventory.acme.com/api/reserve"
method: POST
body: |
{
"items": {{ inputs.order_data.items | tojson }},
"order_id": "{{ execution.id }}"
}
store: true
allowFailure: false
# Step 2: Process payment (with compensation)
- id: process_payment
type: io.kestra.plugin.core.http.Request
uri: "https://payments.acme.com/api/charge"
method: POST
body: |
{
"order_id": "{{ execution.id }}",
"amount": {{ inputs.order_data.total }},
"payment_method": {{ inputs.order_data.payment_method | tojson }}
}
store: true
allowFailure: false
# Step 3: Schedule shipping (with compensation)
- id: schedule_shipping
type: io.kestra.plugin.core.http.Request
uri: "https://shipping.acme.com/api/schedule"
method: POST
body: |
{
"order_id": "{{ execution.id }}",
"address": {{ inputs.order_data.shipping_address | tojson }},
"items": {{ inputs.order_data.items | tojson }}
}
store: true
allowFailure: false
# Step 4: Send confirmation (no compensation needed - idempotent)
- id: send_confirmation
type: io.kestra.plugin.core.http.Request
uri: "https://notifications.acme.com/api/send"
method: POST
body: |
{
"order_id": "{{ execution.id }}",
"customer_email": "{{ inputs.order_data.customer_email }}",
"template": "order_confirmation"
}
store: true
allowFailure: true # Non-critical step
# SAGA: Compensation handlers (run on failure)
- id: compensation_orchestrator
type: io.kestra.plugin.core.flow.Sequential
tasks:
# Compensate in reverse order
- id: cancel_shipping_if_scheduled
type: io.kestra.plugin.core.http.Request
uri: "https://shipping.acme.com/api/cancel/{{ execution.id }}"
method: DELETE
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.saga_orchestrator.outputs.schedule_shipping != null and
outputs.saga_orchestrator.outputs.schedule_shipping.state.current == 'SUCCESS' }}
- id: refund_payment_if_charged
type: io.kestra.plugin.core.http.Request
uri: "https://payments.acme.com/api/refund/{{ execution.id }}"
method: POST
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.saga_orchestrator.outputs.process_payment != null and
outputs.saga_orchestrator.outputs.process_payment.state.current == 'SUCCESS' }}
- id: release_inventory_if_reserved
type: io.kestra.plugin.core.http.Request
uri: "https://inventory.acme.com/api/release/{{ execution.id }}"
method: DELETE
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.saga_orchestrator.outputs.reserve_inventory != null and
outputs.saga_orchestrator.outputs.reserve_inventory.state.current == 'SUCCESS' }}
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.saga_orchestrator.state.current == 'FAILED' }}"
# SAGA: State persistence
- id: persist_saga_state
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO saga_states
(saga_id, execution_id, step, status, data, created_at, compensated)
VALUES (
'order_processing',
'{{ execution.id }}',
'{{ taskrun.id }}',
'{{ taskrun.state.current }}',
'{{ taskrun.value | tojson }}',
NOW(),
{{ outputs.compensation_orchestrator != null }}
)
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "true" # Always run after each task
send:
- saga_orchestrator
- compensation_orchestrator
Choreographed Saga Pattern
id: choreographed-saga
namespace: acme.advanced
description: Event-driven saga pattern
tasks:
- id: start_saga
type: io.kestra.plugin.core.debug.Return
format: "SAGA_STARTED:{{ execution.id }}"
- id: publish_order_created
type: io.kestra.plugin.kafka.Produce
topic: "order-events"
key: "{{ execution.id }}"
value: |
{
"event_type": "ORDER_CREATED",
"order_id": "{{ execution.id }}",
"timestamp": "{{ now() }}",
"data": {{ inputs.order_data | tojson }}
}
properties:
bootstrap.servers: "kafka:9092"
# Each service listens to events and publishes its own
# This is more decoupled but harder to monitor
- id: monitor_saga_completion
type: io.kestra.plugin.kafka.Consume
topic: "saga-completion-events"
groupId: "saga-monitor-{{ execution.id }}"
maxRecords: 1
timeout: PT5M
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.publish_order_created.state.current == 'SUCCESS' }}"
Pattern 4: Event Sourcing with Kestra
Building Event-Sourced Systems
Event sourcing stores state changes as a sequence of events, providing audit trails and time-travel capabilities.
id: event-sourced-shopping-cart
namespace: acme.ecommerce
description: Event-sourced shopping cart implementation
inputs:
- name: cart_id
type: STRING
required: false
defaults: "cart_{{ execution.id }}"
- name: action
type: STRING
required: true
choices: ["add_item", "remove_item", "update_quantity", "clear_cart", "checkout"]
- name: item_data
type: JSON
required: false
tasks:
# Validate action based on current state
- id: load_current_state
type: io.kestra.plugin.jdbc.postgresql.Query
fetch: true
sql: |
WITH current_state AS (
SELECT
cart_id,
SUM(CASE WHEN event_type = 'ITEM_ADDED' THEN 1 ELSE 0 END) as items_added,
SUM(CASE WHEN event_type = 'ITEM_REMOVED' THEN 1 ELSE 0 END) as items_removed,
MAX(occurred_at) as last_event_at
FROM cart_events
WHERE cart_id = '{{ inputs.cart_id }}'
GROUP BY cart_id
),
last_checkout AS (
SELECT MAX(occurred_at) as checkout_time
FROM cart_events
WHERE cart_id = '{{ inputs.cart_id }}'
AND event_type = 'CHECKOUT_COMPLETED'
)
SELECT
cs.cart_id,
cs.items_added - cs.items_removed as current_item_count,
lc.checkout_time
FROM current_state cs
LEFT JOIN last_checkout lc ON 1=1
ignoreMissing: true
- id: validate_action
type: io.kestra.plugin.core.flow.Switch
value: "{{ inputs.action }}"
cases:
checkout:
- id: check_cart_not_empty
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.load_current_state.rows[0].current_item_count | default(0) > 0 }}
errorMessage: "Cannot checkout empty cart"
add_item:
- id: validate_item_data
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ inputs.item_data != null and
inputs.item_data.item_id and
inputs.item_data.quantity > 0 }}
errorMessage: "Invalid item data"
# Store the event
- id: store_event
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO cart_events
(event_id, cart_id, event_type, event_data, occurred_at, version)
VALUES (
'{{ execution.id }}',
'{{ inputs.cart_id }}',
'{{ inputs.action | upper }}',
'{{ inputs.item_data | default({}) | tojson }}',
NOW(),
COALESCE(
(SELECT MAX(version) + 1
FROM cart_events
WHERE cart_id = '{{ inputs.cart_id }}'),
1
)
)
# Project current state from events
- id: project_current_state
type: io.kestra.plugin.scripts.python.Script
description: "Project current cart state from event stream"
sql: |
SELECT * FROM cart_events
WHERE cart_id = '{{ inputs.cart_id }}'
ORDER BY version
script: |
import json
# Replay events to build current state
cart_state = {
'cart_id': '{{ inputs.cart_id }}',
'items': {},
'total_quantity': 0,
'total_price': 0.0,
'event_count': 0
}
# In a real implementation, you would fetch events from DB
# For this example, we'll simulate event replay
# Process based on action
action = '{{ inputs.action }}'
item_data = {{ inputs.item_data | default('{}') | tojson }}
if action == 'add_item':
item_id = item_data.get('item_id')
if item_id:
if item_id not in cart_state['items']:
cart_state['items'][item_id] = {
'quantity': 0,
'price': item_data.get('price', 0)
}
cart_state['items'][item_id]['quantity'] += item_data.get('quantity', 1)
cart_state['total_quantity'] += item_data.get('quantity', 1)
cart_state['total_price'] += item_data.get('price', 0) * item_data.get('quantity', 1)
elif action == 'remove_item':
item_id = item_data.get('item_id')
if item_id in cart_state['items']:
quantity_to_remove = min(
item_data.get('quantity', 1),
cart_state['items'][item_id]['quantity']
)
cart_state['items'][item_id]['quantity'] -= quantity_to_remove
cart_state['total_quantity'] -= quantity_to_remove
cart_state['total_price'] -= cart_state['items'][item_id]['price'] * quantity_to_remove
if cart_state['items'][item_id]['quantity'] <= 0:
del cart_state['items'][item_id]
cart_state['event_count'] += 1
print(json.dumps(cart_state, indent=2))
# Publish event to event bus
- id: publish_to_event_bus
type: io.kestra.plugin.kafka.Produce
topic: "cart-events"
key: "{{ inputs.cart_id }}"
value: |
{
"cart_id": "{{ inputs.cart_id }}",
"event_type": "{{ inputs.action | upper }}",
"event_data": {{ inputs.item_data | default({}) | tojson }},
"timestamp": "{{ now() }}",
"event_id": "{{ execution.id }}"
}
properties:
bootstrap.servers: "kafka:9092"
Time-Travel Query Pattern
id: time-travel-query
namespace: acme.advanced
description: Query historical state at a specific point in time
inputs:
- name: entity_id
type: STRING
required: true
- name: as_of_time
type: DATETIME
required: true
tasks:
- id: replay_events_to_timestamp
type: io.kestra.plugin.jdbc.postgresql.Query
fetch: true
sql: |
SELECT
event_type,
event_data,
occurred_at,
version
FROM entity_events
WHERE entity_id = '{{ inputs.entity_id }}'
AND occurred_at <= '{{ inputs.as_of_time }}'
ORDER BY version
- id: reconstruct_historical_state
type: io.kestra.plugin.scripts.python.Script
inputFiles:
events.json: |
{{ outputs.replay_events_to_timestamp.rows | tojson }}
script: |
import json
with open('events.json', 'r') as f:
events = json.load(f)
# Start with empty state
state = {}
# Replay events in order
for event in events:
event_type = event['event_type']
event_data = event['event_data']
if event_type == 'ENTITY_CREATED':
state = event_data
elif event_type == 'ENTITY_UPDATED':
state.update(event_data)
elif event_type == 'FIELD_SET':
field_name = event_data['field']
state[field_name] = event_data['value']
elif event_type == 'FIELD_INCREMENTED':
field_name = event_data['field']
if field_name in state:
state[field_name] += event_data['amount']
else:
state[field_name] = event_data['amount']
# Add more event handlers as needed
result = {
'entity_id': '{{ inputs.entity_id }}',
'as_of_time': '{{ inputs.as_of_time }}',
'state': state,
'events_replayed': len(events),
'last_event_time': events[-1]['occurred_at'] if events else None
}
print(json.dumps(result, indent=2))
Pattern 5: CQRS (Command Query Responsibility Segregation)
Separating Read and Write Models
CQRS separates read and write operations for better scalability and optimization.
id: cqrs-user-profile
namespace: acme.social
description: CQRS pattern for user profiles
inputs:
- name: command
type: STRING
required: true
choices: ["update_profile", "get_profile", "search_profiles"]
- name: user_id
type: STRING
required: false
- name: profile_data
type: JSON
required: false
tasks:
# Command Side: Handle writes
- id: handle_command
type: io.kestra.plugin.core.flow.Switch
value: "{{ inputs.command }}"
cases:
update_profile:
- id: validate_profile_data
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ inputs.profile_data != null and
inputs.user_id != null }}
errorMessage: "Missing profile data or user_id"
- id: store_write_event
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO user_profile_events
(event_id, user_id, event_type, event_data, occurred_at)
VALUES (
'{{ execution.id }}',
'{{ inputs.user_id }}',
'PROFILE_UPDATED',
'{{ inputs.profile_data | tojson }}',
NOW()
)
- id: update_read_model_async
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.background
flowId: update-profile-read-model
wait: false # Fire and forget - eventual consistency
transmit:
- name: user_id
value: "{{ inputs.user_id }}"
- name: profile_data
value: "{{ inputs.profile_data | tojson }}"
- id: invalidate_cache
type: io.kestra.plugin.redis.Delete
key: "profile:{{ inputs.user_id }}"
host: "redis"
port: 6379
- id: send_profile_updated_event
type: io.kestra.plugin.kafka.Produce
topic: "user-profile-events"
key: "{{ inputs.user_id }}"
value: |
{
"event_type": "PROFILE_UPDATED",
"user_id": "{{ inputs.user_id }}",
"timestamp": "{{ now() }}",
"event_id": "{{ execution.id }}"
}
# Query Side: Handle reads (optimized for reading)
- id: handle_query
type: io.kestra.plugin.core.flow.Switch
value: "{{ inputs.command }}"
cases:
get_profile:
- id: try_cache_first
type: io.kestra.plugin.redis.Get
key: "profile:{{ inputs.user_id }}"
host: "redis"
port: 6379
- id: cache_hit
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.try_cache_first.value }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.try_cache_first.value != null }}"
- id: cache_miss_query
type: io.kestra.plugin.jdbc.postgresql.Query
fetch: true
sql: |
SELECT
user_id,
display_name,
avatar_url,
bio,
location,
website,
follower_count,
following_count,
post_count,
last_updated
FROM user_profiles_read
WHERE user_id = '{{ inputs.user_id }}'
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.try_cache_first.value == null }}"
- id: populate_cache
type: io.kestra.plugin.redis.Put
key: "profile:{{ inputs.user_id }}"
value: |
{{ outputs.cache_miss_query.rows[0] | tojson }}
ttl: PT1H
host: "redis"
port: 6379
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.cache_miss_query.rows | length > 0 }}
- id: return_profile
type: io.kestra.plugin.core.debug.Return
format: >
{{ outputs.cache_miss_query.rows[0] | tojson if outputs.cache_miss_query.rows else '{}' }}
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.try_cache_first.value == null }}"
search_profiles:
- id: search_read_model
type: io.kestra.plugin.jdbc.elasticsearch.Search
connection:
hosts: ["elasticsearch:9200"]
index: "user_profiles"
query: |
{
"query": {
"multi_match": {
"query": "{{ inputs.profile_data.query }}",
"fields": ["display_name^2", "bio", "location"]
}
},
"size": 20
}
- id: return_search_results
type: io.kestra.plugin.core.debug.Return
format: >
{{ outputs.search_read_model.hits | tojson }}
Read Model Update Processor
id: update-profile-read-model
namespace: acme.background
description: Background processor to update read models
tasks:
- id: listen_for_profile_events
type: io.kestra.plugin.kafka.Consume
topic: "user-profile-events"
groupId: "read-model-updater"
maxRecords: 100
timeout: PT10S
- id: process_events_batch
type: io.kestra.plugin.core.flow.EachSequential
value: "{{ outputs.listen_for_profile_events.messages }}"
tasks:
- id: "process_event_{{ taskloop.index }}"
type: io.kestra.plugin.core.flow.Switch
value: "{{ task.value.value.event_type }}"
cases:
PROFILE_UPDATED:
- id: update_read_model
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO user_profiles_read
(user_id, display_name, avatar_url, bio, location, website, last_updated)
VALUES (
'{{ task.value.value.user_id }}',
'{{ task.value.value.event_data.display_name | default('') }}',
'{{ task.value.value.event_data.avatar_url | default('') }}',
'{{ task.value.value.event_data.bio | default('') }}',
'{{ task.value.value.event_data.location | default('') }}',
'{{ task.value.value.event_data.website | default('') }}',
NOW()
)
ON CONFLICT (user_id) DO UPDATE SET
display_name = EXCLUDED.display_name,
avatar_url = EXCLUDED.avatar_url,
bio = EXCLUDED.bio,
location = EXCLUDED.location,
website = EXCLUDED.website,
last_updated = NOW()
- id: update_search_index
type: io.kestra.plugin.jdbc.elasticsearch.Index
connection:
hosts: ["elasticsearch:9200"]
index: "user_profiles"
id: "{{ task.value.value.user_id }}"
document: |
{
"user_id": "{{ task.value.value.user_id }}",
"display_name": "{{ task.value.value.event_data.display_name | default('') }}",
"bio": "{{ task.value.value.event_data.bio | default('') }}",
"location": "{{ task.value.value.event_data.location | default('') }}",
"last_updated": "{{ now() }}"
}
FOLLOW_ADDED:
- id: increment_follower_count
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
UPDATE user_profiles_read
SET follower_count = follower_count + 1
WHERE user_id = '{{ task.value.value.target_user_id }}'
- id: increment_following_count
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
UPDATE user_profiles_read
SET following_count = following_count + 1
WHERE user_id = '{{ task.value.value.source_user_id }}'
- id: commit_kafka_offsets
type: io.kestra.plugin.kafka.Consume
topic: "user-profile-events"
groupId: "read-model-updater"
maxRecords: 0 # Just commit offsets
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.listen_for_profile_events.messages | length > 0 }}"
Pattern 6: Strangler Fig Pattern
Incremental Migration Strategy
The Strangler Fig pattern incrementally replaces a legacy system by building new functionality around it, then gradually migrating.
id: strangler-fig-migration
namespace: acme.migration
description: Incrementally migrate from legacy to new system
variables:
migration_phase: "parallel_run" # Options: shadow, parallel_run, cutover
traffic_percentage: 50 # Percentage of traffic to send to new system
tasks:
# Feature Flag: Route traffic based on migration phase
- id: route_traffic
type: io.kestra.plugin.core.flow.Switch
value: "{{ vars.migration_phase }}"
cases:
shadow:
# Shadow mode: Send to both, compare results
- id: call_legacy_system
type: io.kestra.plugin.core.http.Request
uri: "https://legacy-api.acme.com/process"
method: POST
body: "{{ inputs.request_body }}"
store: true
- id: call_new_system_shadow
type: io.kestra.plugin.core.http.Request
uri: "https://new-api.acme.com/process"
method: POST
body: "{{ inputs.request_body }}"
store: true
- id: compare_results
type: io.kestra.plugin.scripts.python.Script
inputFiles:
legacy_response.json: "{{ outputs.call_legacy_system.uri }}"
new_response.json: "{{ outputs.call_new_system_shadow.uri }}"
script: |
import json
import difflib
with open('legacy_response.json', 'r') as f:
legacy = json.load(f)
with open('new_response.json', 'r') as f:
new = json.load(f)
# Compare results
comparison = {
'match': legacy == new,
'legacy_response': legacy,
'new_response': new,
'differences': []
}
if legacy != new:
# Find specific differences
legacy_str = json.dumps(legacy, sort_keys=True, indent=2)
new_str = json.dumps(new, sort_keys=True, indent=2)
diff = list(difflib.unified_diff(
legacy_str.splitlines(),
new_str.splitlines(),
lineterm=''
))
comparison['differences'] = diff[:10] # First 10 differences
print(json.dumps(comparison, indent=2))
- id: return_legacy_response
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.call_legacy_system.body | tojson }}"
parallel_run:
# Parallel run: Split traffic between systems
- id: decide_routing
type: io.kestra.plugin.core.debug.Return
format: >
{{ random() * 100 <= vars.traffic_percentage }}
- id: call_new_system_parallel
type: io.kestra.plugin.core.http.Request
uri: "https://new-api.acme.com/process"
method: POST
body: "{{ inputs.request_body }}"
store: true
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.decide_routing.value == true }}"
- id: call_legacy_system_fallback
type: io.kestra.plugin.core.http.Request
uri: "https://legacy-api.acme.com/process"
method: POST
body: "{{ inputs.request_body }}"
store: true
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.decide_routing.value == false }}"
- id: return_response
type: io.kestra.plugin.core.flow.Switch
value: "{{ outputs.decide_routing.value }}"
cases:
true:
- id: return_new_response
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.call_new_system_parallel.body | tojson }}"
false:
- id: return_legacy_response_fallback
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.call_legacy_system_fallback.body | tojson }}"
cutover:
# Cutover: All traffic to new system
- id: call_new_system_primary
type: io.kestra.plugin.core.http.Request
uri: "https://new-api.acme.com/process"
method: POST
body: "{{ inputs.request_body }}"
store: true
- id: return_new_response_primary
type: io.kestra.plugin.core.debug.Return
format: "{{ outputs.call_new_system_primary.body | tojson }}"
- id: legacy_system_backup
type: io.kestra.plugin.core.http.Request
uri: "https://legacy-api.acme.com/process"
method: POST
body: "{{ inputs.request_body }}"
store: true
allowFailure: true # Legacy might be turned off
# Migration Monitoring
- id: track_migration_metrics
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO migration_metrics
(migration_phase, traffic_percentage, request_count,
success_count, error_count, avg_response_time, timestamp)
SELECT
'{{ vars.migration_phase }}',
{{ vars.traffic_percentage }},
COUNT(*) as request_count,
SUM(CASE WHEN status_code = 200 THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN status_code != 200 THEN 1 ELSE 0 END) as error_count,
AVG(response_time_ms) as avg_response_time,
NOW()
FROM api_requests
WHERE timestamp >= NOW() - INTERVAL '1 minute'
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ vars.migration_phase != 'completed' }}"
Data Migration with Dual Write
id: dual-write-migration
namespace: acme.migration
description: Dual write to both old and new databases during migration
tasks:
- id: write_to_both_systems
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: write_to_legacy
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://legacy-db.acme.com:5432/app"
username: "{{ secret('LEGACY_DB_USER') }}"
password: "{{ secret('LEGACY_DB_PASSWORD') }}"
sql: |
INSERT INTO orders
(order_id, customer_id, amount, status, created_at)
VALUES (
'{{ inputs.order_id }}',
'{{ inputs.customer_id }}',
{{ inputs.amount }},
'pending',
NOW()
)
allowFailure: false
- id: write_to_new
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://new-db.acme.com:5432/app"
username: "{{ secret('NEW_DB_USER') }}"
password: "{{ secret('NEW_DB_PASSWORD') }}"
sql: |
INSERT INTO orders
(order_id, customer_id, amount, status, created_at, metadata)
VALUES (
'{{ inputs.order_id }}',
'{{ inputs.customer_id }}',
{{ inputs.amount }},
'pending',
NOW(),
'{"migration_phase": "dual_write"}'
)
allowFailure: true # New system might have issues
- id: verify_data_consistency
type: io.kestra.plugin.scripts.python.Script
description: "Verify data was written to both systems consistently"
script: |
import psycopg2
import json
def query_db(host, dbname, user, password, query):
conn = psycopg2.connect(
host=host, dbname=dbname, user=user, password=password
)
cur = conn.cursor()
cur.execute(query)
result = cur.fetchone()
cur.close()
conn.close()
return result
# Query both systems
legacy_query = f"""
SELECT order_id, customer_id, amount, status
FROM orders
WHERE order_id = '{ {{ inputs.order_id }} }'
"""
new_query = f"""
SELECT order_id, customer_id, amount, status
FROM orders
WHERE order_id = '{ {{ inputs.order_id }} }'
"""
try:
legacy_result = query_db(
'legacy-db.acme.com', 'app',
'{{ secret('LEGACY_DB_USER') }}', '{{ secret('LEGACY_DB_PASSWORD') }}',
legacy_query
)
except Exception as e:
legacy_result = None
print(f"Legacy query failed: {e}")
try:
new_result = query_db(
'new-db.acme.com', 'app',
'{{ secret('NEW_DB_USER') }}', '{{ secret('NEW_DB_PASSWORD') }}',
new_query
)
except Exception as e:
new_result = None
print(f"New query failed: {e}")
# Compare results
comparison = {
'order_id': '{{ inputs.order_id }}',
'legacy_found': legacy_result is not None,
'new_found': new_result is not None,
'consistent': legacy_result == new_result,
'legacy_data': legacy_result,
'new_data': new_result
}
print(json.dumps(comparison, indent=2))
if not comparison['consistent']:
# Log inconsistency for manual review
print("WARNING: Data inconsistency detected!")
- id: handle_inconsistency
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.reconciliation
flowId: reconcile-dual-write
wait: false # Async reconciliation
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.verify_data_consistency.vars.consistent == false }}
Pattern 7: Sidecar Pattern for Cross-Cutting Concerns
Implementing Cross-Cutting Logic
The sidecar pattern attaches supporting functionality to the main workflow without modifying its core logic.
id: main-business-workflow
namespace: acme.business
description: Main business logic with sidecar attachments
tasks:
- id: business_logic
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: step1
type: io.kestra.plugin.core.log.Log
message: "Business step 1"
- id: step2
type: io.kestra.plugin.core.debug.Return
format: "Business step 2 result"
- id: step3
type: io.kestra.plugin.core.log.Log
message: "Business step 3 with result: {{ outputs.step2.value }}"
# Sidecar flow that can be attached to any workflow
id: monitoring-sidecar
namespace: acme.sidecars
description: Attachable monitoring and observability sidecar
tasks:
- id: start_monitoring
type: io.kestra.plugin.core.debug.Return
format: "MONITORING_START:{{ parent.execution.id }}"
- id: collect_metrics
type: io.kestra.plugin.core.http.Request
uri: "http://metrics-collector.acme.com/api/metrics"
method: POST
body: |
{
"execution_id": "{{ parent.execution.id }}",
"flow_id": "{{ parent.flow.id }}",
"namespace": "{{ parent.namespace.id }}",
"start_time": "{{ parent.execution.startDate }}",
"metrics": {
"task_count": {{ parent.tasks | length }},
"inputs": {{ parent.inputs | default({}) | tojson }}
}
}
store: true
- id: log_performance
type: io.kestra.plugin.core.log.Log
message: |
Sidecar monitoring for {{ parent.execution.id }}
Flow: {{ parent.flow.id }}
Tasks monitored: {{ parent.tasks | length }}
Composed Workflow with Sidecars
id: composed-workflow-with-sidecars
namespace: acme.composed
description: Compose main workflow with multiple sidecars
tasks:
# Attach monitoring sidecar
- id: attach_monitoring
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sidecars
flowId: monitoring-sidecar
wait: false # Run alongside
# Attach auditing sidecar
- id: attach_auditing
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sidecars
flowId: auditing-sidecar
wait: false
# Main business logic
- id: main_business_process
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.business
flowId: main-business-workflow
wait: true
# Attach cleanup sidecar (runs after main process)
- id: attach_cleanup
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sidecars
flowId: cleanup-sidecar
wait: false
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.main_business_process.state.current in ['SUCCESS', 'FAILED'] }}
Pattern 8: Pipeline Pattern with Middleware
Building Processing Pipelines
id: data-processing-pipeline
namespace: acme.pipelines
description: Configurable pipeline with middleware support
variables:
pipeline_stages: ["validate", "transform", "enrich", "filter", "output"]
middleware_enabled: ["logging", "metrics", "validation", "caching"]
tasks:
# Pipeline orchestrator
- id: pipeline_orchestrator
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: load_data
type: io.kestra.plugin.core.http.Download
uri: "https://api.acme.com/data"
store: true
# Dynamic pipeline stages
- id: execute_pipeline_stages
type: io.kestra.plugin.core.flow.EachSequential
value: "{{ vars.pipeline_stages }}"
tasks:
- id: "execute_{{ task.value }}"
type: io.kestra.plugin.core.flow.Switch
value: "{{ task.value }}"
cases:
validate:
- id: run_validation
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.pipeline.middleware
flowId: validation-middleware
wait: true
transmit:
- name: data
value: "{{ outputs.load_data.uri }}"
transform:
- id: run_transformation
type: io.kestra.plugin.scripts.python.Script
inputFiles:
data.json: "{{ outputs.execute_validation.outputs.validated_data | default(outputs.load_data.uri) }}"
script: |
import json
import pandas as pd
with open('data.json', 'r') as f:
data = json.load(f)
# Transformation logic
df = pd.DataFrame(data)
# ... transformation steps
df.to_parquet('transformed.parquet', index=False)
print("Transformation complete")
enrich:
- id: run_enrichment
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.pipeline.middleware
flowId: enrichment-middleware
wait: true
filter:
- id: run_filtering
type: io.kestra.plugin.scripts.python.Script
script: |
# Filtering logic
print("Filtering complete")
output:
- id: produce_output
type: io.kestra.plugin.core.debug.Return
format: "Pipeline execution complete"
# Apply middleware after each stage
- id: "apply_middleware_{{ task.value }}"
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: "logging_middleware_{{ task.value }}"
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.pipeline.middleware
flowId: logging-middleware
wait: false
transmit:
- name: stage
value: "{{ task.value }}"
- name: execution_id
value: "{{ execution.id }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "'logging' in vars.middleware_enabled"
- id: "metrics_middleware_{{ task.value }}"
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.pipeline.middleware
flowId: metrics-middleware
wait: false
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "'metrics' in vars.middleware_enabled"
Middleware Implementation
id: logging-middleware
namespace: acme.pipeline.middleware
description: Logging middleware for pipeline stages
tasks:
- id: log_stage_execution
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO pipeline_logs
(execution_id, stage, status, start_time, end_time, duration_ms)
VALUES (
'{{ inputs.execution_id }}',
'{{ inputs.stage }}',
'{{ parent.state.current }}',
'{{ parent.startDate }}',
'{{ now() }}',
EXTRACT(EPOCH FROM (NOW() - '{{ parent.startDate }}')) * 1000
)
Pattern 9: Bulkhead Pattern for Fault Isolation
Isolating Failures with Resource Pools
id: bulkhead-pattern
namespace: acme.resilience
description: Isolate failures using resource pools
variables:
# Different resource pools for different services
resource_pools:
payment_service:
max_concurrent: 5
timeout: PT30S
inventory_service:
max_concurrent: 3
timeout: PT20S
shipping_service:
max_concurrent: 2
timeout: PT60S
tasks:
# Resource pool manager
- id: acquire_resource_slot
type: io.kestra.plugin.core.cache.Increment
key: "resource_pool:{{ inputs.service_name }}:active"
defaultValue: 0
ttl: PT5M
- id: check_pool_capacity
type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.acquire_resource_slot.value <=
vars.resource_pools[inputs.service_name].max_concurrent }}
- id: execute_with_resource_limit
type: io.kestra.plugin.core.flow.Switch
value: "{{ inputs.service_name }}"
cases:
payment_service:
- id: call_payment_service
type: io.kestra.plugin.core.http.Request
uri: "https://payments.acme.com/api/process"
method: POST
body: "{{ inputs.request_data }}"
timeout: "{{ vars.resource_pools.payment_service.timeout }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.check_pool_capacity.value }}"
inventory_service:
- id: call_inventory_service
type: io.kestra.plugin.core.http.Request
uri: "https://inventory.acme.com/api/check"
method: POST
body: "{{ inputs.request_data }}"
timeout: "{{ vars.resource_pools.inventory_service.timeout }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.check_pool_capacity.value }}"
- id: pool_capacity_exceeded
type: io.kestra.plugin.core.debug.Return
format: "Resource pool capacity exceeded for {{ inputs.service_name }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ not outputs.check_pool_capacity.value }}"
# Release resource slot
- id: release_resource_slot
type: io.kestra.plugin.core.cache.Decrement
key: "resource_pool:{{ inputs.service_name }}:active"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.execute_with_resource_limit != null or
outputs.pool_capacity_exceeded != null }}
Pattern 10: Ambassador Pattern for Service Abstraction
Abstracting External Services
id: payment-service-ambassador
namespace: acme.ambassadors
description: Ambassador pattern for payment service abstraction
tasks:
# Service discovery and health check
- id: discover_payment_endpoints
type: io.kestra.plugin.core.http.Request
uri: "https://service-discovery.acme.com/api/endpoints/payment"
method: GET
store: true
- id: select_healthy_endpoint
type: io.kestra.plugin.scripts.python.Script
inputFiles:
endpoints.json: "{{ outputs.discover_payment_endpoints.uri }}"
script: |
import json
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
with open('endpoints.json', 'r') as f:
endpoints = json.load(f)
def check_endpoint_health(endpoint):
try:
response = requests.get(
f"{endpoint['url']}/health",
timeout=2
)
return {
'endpoint': endpoint,
'healthy': response.status_code == 200,
'response_time': response.elapsed.total_seconds()
}
except:
return {
'endpoint': endpoint,
'healthy': False,
'response_time': float('inf')
}
# Check all endpoints in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(check_endpoint_health, ep) for ep in endpoints]
results = [f.result() for f in as_completed(futures)]
# Select the healthiest endpoint
healthy_endpoints = [r for r in results if r['healthy']]
if healthy_endpoints:
# Choose the fastest healthy endpoint
selected = min(healthy_endpoints, key=lambda x: x['response_time'])
print(json.dumps(selected['endpoint']))
else:
raise Exception("No healthy payment endpoints available")
# Circuit breaker and retry logic
- id: call_payment_service_with_retry
type: io.kestra.plugin.core.http.Request
uri: "{{ outputs.select_healthy_endpoint.vars.url }}/api/process"
method: POST
body: "{{ inputs.payment_data }}"
retry:
type: exponential
maxAttempt: 3
delay: PT1S
timeout: PT10S
# Response transformation
- id: transform_payment_response
type: io.kestra.plugin.scripts.python.Script
inputFiles:
response.json: "{{ outputs.call_payment_service_with_retry.uri }}"
script: |
import json
with open('response.json', 'r') as f:
response = json.load(f)
# Standardize response format
standardized = {
'success': response.get('status') == 'success',
'transaction_id': response.get('id'),
'amount': response.get('amount'),
'currency': response.get('currency', 'USD'),
'timestamp': response.get('created'),
'provider': 'payment_service',
'metadata': {
'original_response': response
}
}
print(json.dumps(standardized, indent=2))
# Cache successful responses
- id: cache_payment_response
type: io.kestra.plugin.redis.Put
key: "payment:{{ inputs.payment_data.transaction_id }}"
value: "{{ outputs.transform_payment_response.vars }}"
ttl: PT5M
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.transform_payment_response.vars.success == true }}"
Putting It All Together: A Complete Advanced Workflow
id: advanced-order-processing
namespace: acme.production
description: Complete advanced workflow combining multiple patterns
tasks:
# Pattern 1: Circuit Breaker for external services
- id: check_service_health
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: check_payment_service
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.resilience
flowId: circuit-breaker-check
wait: true
transmit:
- name: service_name
value: "payment_service"
- id: check_inventory_service
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.resilience
flowId: circuit-breaker-check
wait: true
transmit:
- name: service_name
value: "inventory_service"
# Pattern 2: Bulkhead for resource isolation
- id: process_with_bulkheads
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: validate_order_bulkhead
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.resilience
flowId: bulkhead-processor
wait: true
transmit:
- name: service_name
value: "validation"
- name: request_data
value: "{{ inputs.order_data }}"
- id: reserve_inventory_bulkhead
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.resilience
flowId: bulkhead-processor
wait: true
transmit:
- name: service_name
value: "inventory"
- name: request_data
value: "{{ inputs.order_data }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: >
{{ outputs.check_service_health.outputs.check_inventory_service.outputs.circuit_state != 'open' }}
# Pattern 3: Saga pattern for distributed transaction
- id: execute_order_saga
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: saga_step1_payment
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.ambassadors
flowId: payment-service-ambassador
wait: true
transmit:
- name: payment_data
value: "{{ inputs.payment_data }}"
- id: saga_step2_fulfillment
type: io.kestra.plugin.core.http.Request
uri: "https://fulfillment.acme.com/api/process"
method: POST
body: |
{
"order_id": "{{ execution.id }}",
"items": {{ inputs.order_data.items | tojson }},
"payment_reference": "{{ outputs.saga_step1_payment.outputs.transaction_id }}"
}
- id: saga_step3_notification
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: notify_customer
type: io.kestra.plugin.notifications.email.EmailSend
to: "{{ inputs.customer_email }}"
subject: "Order Confirmation #{{ execution.id }}"
htmlBody: |
<h1>Thank you for your order!</h1>
<p>Order ID: {{ execution.id }}</p>
- id: notify_internal
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_ORDERS_WEBHOOK') }}"
payload: |
{
"text": "New order processed: {{ execution.id }}"
}
# Pattern 4: Event sourcing for audit trail
- id: record_order_events
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: record_order_created
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO order_events
(event_id, order_id, event_type, event_data, occurred_at)
VALUES (
'{{ execution.id }}_created',
'{{ execution.id }}',
'ORDER_CREATED',
'{{ inputs.order_data | tojson }}',
NOW()
)
- id: record_order_processing
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO order_events
(event_id, order_id, event_type, event_data, occurred_at)
VALUES (
'{{ execution.id }}_processing',
'{{ execution.id }}',
'ORDER_PROCESSING',
'{{ outputs.execute_order_saga | tojson }}',
NOW()
)
# Pattern 5: CQRS update for read models
- id: update_read_models
type: io.kestra.plugin.core.flow.Parallel
wait: false # Async updates
tasks:
- id: update_order_summary
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO order_summaries
(order_id, customer_id, total_amount, status, items_count, created_at)
VALUES (
'{{ execution.id }}',
'{{ inputs.customer_id }}',
{{ inputs.order_data.total }},
'processed',
{{ inputs.order_data.items | length }},
NOW()
)
- id: update_customer_metrics
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
UPDATE customer_metrics
SET
total_orders = total_orders + 1,
total_spent = total_spent + {{ inputs.order_data.total }},
last_order_at = NOW()
WHERE customer_id = '{{ inputs.customer_id }}'
# Pattern 6: Sidecar for monitoring
- id: attach_monitoring_sidecar
type: io.kestra.plugin.core.flow.Subflow
namespace: acme.sidecars
flowId: monitoring-sidecar
wait: false
Best Practices for Advanced Patterns
1. Pattern Selection Guide
For fault tolerance: Circuit Breaker, Bulkhead, Retry
For distributed transactions: Saga, Event Sourcing
For migration: Strangler Fig, Dual Write
For scalability: CQRS, Sharding, Partitioning
For observability: Sidecar, Ambassador
2. Testing Advanced Patterns
# Test circuit breaker
id: test-circuit-breaker
namespace: acme.tests
tasks:
- id: simulate_failures
type: io.kestra.plugin.core.flow.EachSequential
value: "{{ range(1, 6) }}"
tasks:
- id: call_failing_service
type: io.kestra.plugin.core.http.Request
uri: "http://mock-service/fail"
allowFailure: true
- id: verify_circuit_tripped
type: io.kestra.plugin.core.cache.Get
key: "circuit_breaker:mock-service"
- id: assert_circuit_open
type: io.kestra.plugin.core.condition.Expression
expression: '{{ outputs.verify_circuit_tripped.value.state == "open" }}'
3. Monitoring and Observability
id: pattern-metrics-collector
namespace: acme.monitoring
tasks:
- id: collect_pattern_metrics
type: io.kestra.plugin.scripts.python.Script
script: |
# Collect metrics for each pattern
metrics = {
'circuit_breaker': {
'open_circuits': count_open_circuits(),
'tripped_today': count_trips_today()
},
'saga': {
'active_sagas': count_active_sagas(),
'compensation_rate': calculate_compensation_rate()
},
'cqrs': {
'read_lag_ms': calculate_read_lag(),
'write_throughput': calculate_write_throughput()
}
}
4. Documentation Strategy
# Annotate patterns in flow metadata
labels:
patterns: "circuit-breaker,saga,cqrs"
complexity: "advanced"
team: "platform-engineering"
documentation: |
## Applied Patterns
### Circuit Breaker
- Prevents cascading failures from payment service
- Config: 5 failures trips circuit, 5 minute timeout
### Saga Pattern
- Manages distributed order processing
- Compensation: reverse payment, release inventory
### CQRS
- Separate read/write models for orders
- Eventual consistency: < 1 second lag
Common Anti-Patterns to Avoid
1. Over-Engineering
# BAD: Using saga for simple local transaction
# GOOD: Use simple sequential tasks for local operations
2. Pattern Misapplication
# BAD: Using event sourcing for simple CRUD
# GOOD: Event sourcing for audit-critical domains only
3. Ignoring Consistency Boundaries
# BAD: Distributed transaction across 10 microservices
# GOOD: Saga within bounded context, eventual consistency between
4. Neglecting Monitoring
# BAD: Complex patterns without observability
# GOOD: Comprehensive metrics for each pattern
Conclusion: Becoming a Workflow Architect
You've now explored the most powerful patterns in workflow orchestration. Remember:
Patterns are tools, not goals - Use them to solve specific problems
Start simple, add complexity gradually - Don't over-engineer from day one
Monitor everything - Complex patterns need comprehensive observability
Document decisions - Future you (and your team) will thank you
Test thoroughly - Edge cases multiply with complexity
Your Advanced Challenge:
Pick one complex business problem and solve it with multiple patterns:
E-commerce checkout with inventory, payment, and fulfillment
Data pipeline with validation, transformation, and quality checks
User onboarding with verification, provisioning, and welcome sequence
Report generation with data collection, processing, and distribution
System migration from monolith to microservices
Share your architecture in the comments!
In the next article, we'll explore Kestra's Plugin Ecosystem - how to extend Kestra with custom functionality and integrate with any system.
Before the next article, try:
Implement one new pattern in an existing workflow
Create a reusable pattern template for your team
Set up monitoring for pattern-specific metrics
Refactor a complex workflow using appropriate patterns
Document your pattern decisions and trade-offs
Remember: Mastery comes from practice, not just knowledge. Build, iterate, and learn from each implementation.
Happy orchestrating! 🚀


