Article 6: Mastering Kestra's Plugin Ecosystem.
Learner, Love to make things simple, Full Stack Developer, StackOverflower, Passionate about using machine learning, deep learning and AI
Extending Kestra to Every Corner of Your Data Stack.
Introduction: The Power of Plugins
Imagine you're a master chef. You don't just have one knife - you have specialized tools for every task: a paring knife for delicate work, a chef's knife for chopping, a boning knife for meat, and a bread knife for, well, bread. That's what Kestra's plugin ecosystem gives you: specialized tools for every data engineering task.
In this article, you'll learn to:
Navigate Kestra's vast plugin library (150+ plugins and counting)
Create your own custom plugins for proprietary systems
Build plugin-based architectures for enterprise integration
Debug and optimize plugin performance
Contribute to the open-source plugin ecosystem
Quick Stats:
150+ official plugins available
1,000+ companies using Kestra plugins
10,000+ plugin downloads per week
0 lines of code needed for most integrations
Part 1: The Plugin Architecture
Understanding Plugin Layers
Kestra's plugin architecture is like a LEGO system:
Application Layer (Your Flows)
↓
Plugin Interface (Java Interfaces)
↓
Plugin Implementation (Built-in/Custom)
↓
Target System (Databases, APIs, Cloud Services)
Core Plugin Types
# Every plugin belongs to one of these categories
plugins:
- Input/Output:
- Filesystems: S3, GCS, Azure Blob, Local, FTP
- Databases: PostgreSQL, MySQL, Snowflake, BigQuery
- APIs: REST, GraphQL, SOAP
- Processing:
- Scripting: Python, R, Node.js, Bash
- Data Processing: Spark, Flink, Pandas
- ML/AI: TensorFlow, PyTorch, Hugging Face
- Messaging:
- Queues: Kafka, RabbitMQ, AWS SQS
- Streaming: Kafka Streams, Apache Pulsar
- Cloud Services:
- AWS: S3, Lambda, Glue, Redshift
- GCP: BigQuery, Cloud Functions, Dataflow
- Azure: Blob Storage, Data Factory, Synapse
- Monitoring:
- Metrics: Prometheus, Datadog, New Relic
- Logging: ELK, Splunk, CloudWatch
- Alerting: Slack, PagerDuty, Email
- Custom:
- Enterprise Systems: SAP, Salesforce, Oracle
- Internal APIs: Your company's systems
- Legacy Systems: Mainframes, AS/400
Plugin Anatomy: What Makes a Good Plugin?
// The anatomy of a Kestra plugin (simplified)
public class MyPlugin extends Task implements FlowableTask {
// 1. Configuration (YAML properties)
@PluginProperty private String connectionString;
@PluginProperty private Integer timeout;
// 2. Execution logic
@Override
public List<TaskRun> run(RunContext runContext) throws Exception {
// Your plugin logic here
return List.of(TaskRun.of(this, Outputs.of("result", "success")));
}
// 3. Input/Output handling
public interface Output {
String result();
}
}
Part 2: Using Built-in Plugins
Plugin Discovery and Installation
# List available plugins
kestra plugins list
# Search for specific plugins
kestra plugins search --query "postgres"
# Install a plugin
kestra plugins install kestra-plugin-jdbc-postgresql
# View installed plugins
kestra plugins installed
# Update all plugins
kestra plugins update-all
Database Plugins Deep Dive
# flows/database-demo.yml
id: database-plugin-showcase
namespace: demo.plugins
description: Showcase of database plugins
tasks:
# PostgreSQL - Full-featured SQL database
- id: postgres_example
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://localhost:5432/mydb
username: "{{ secret('PG_USER') }}"
password: "{{ secret('PG_PASS') }}"
sql: |
-- Parameterized query
SELECT * FROM users
WHERE created_at > '{{ execution.startDate | dateAdd(-7, 'DAYS') }}'
AND status = '{{ inputs.user_status }}'
fetch: true
fetchSize: 1000
store: true
# Snowflake - Cloud data warehouse
- id: snowflake_example
type: io.kestra.plugin.jdbc.snowflake.Query
url: jdbc:snowflake://account.snowflakecomputing.com
username: "{{ secret('SNOWFLAKE_USER') }}"
password: "{{ secret('SNOWFLAKE_PASS') }}"
role: SYSADMIN
warehouse: COMPUTE_WH
database: PRODUCTION
schema: ANALYTICS
sql: |
COPY INTO @my_stage/users.parquet
FROM (
SELECT * FROM users
WHERE DAY = '{{ execution.startDate | date('yyyy-MM-dd') }}'
)
FILE_FORMAT = (TYPE = PARQUET)
store: true
# BigQuery - Serverless data warehouse
- id: bigquery_example
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
SELECT
user_id,
COUNT(*) as session_count,
SUM(duration) as total_duration
FROM `project.dataset.sessions`
WHERE DATE(timestamp) = '{{ execution.startDate | date('yyyy-MM-dd') }}'
GROUP BY user_id
destinationTable:
projectId: my-project
datasetId: results
tableId: daily_sessions
writeDisposition: WRITE_TRUNCATE
createDisposition: CREATE_IF_NEEDED
# ClickHouse - OLAP database
- id: clickhouse_example
type: io.kestra.plugin.jdbc.clickhouse.Query
url: jdbc:clickhouse://localhost:8123/analytics
sql: |
SELECT
toStartOfDay(timestamp) as day,
uniq(user_id) as daily_active_users,
count(*) as events
FROM events
WHERE timestamp >= '{{ execution.startDate | dateAdd(-30, 'DAYS') }}'
GROUP BY day
ORDER BY day
fetch: true
# Multi-database Join (Virtual Data Warehouse)
- id: cross_database_join
type: io.kestra.plugin.scripts.python.Script
description: "Join data from multiple databases"
inputFiles:
postgres_data.json: "{{ outputs.postgres_example.uri }}"
snowflake_data.json: "{{ outputs.snowflake_example.uri }}"
bigquery_data.json: "{{ outputs.bigquery_example.uri }}"
script: |
import pandas as pd
import json
# Load from different sources
pg_df = pd.read_json('postgres_data.json')
sf_df = pd.read_json('snowflake_data.json')
bq_df = pd.read_json('bigquery_data.json')
# Perform cross-database join
merged = pd.merge(
pg_df,
sf_df,
on='user_id',
how='left'
).merge(
bq_df,
on='user_id',
how='left'
)
# Save result
merged.to_parquet('cross_database_result.parquet', index=False)
print(f"Merged {len(merged)} records from 3 databases")
Cloud Storage Plugins
# flows/cloud-storage.yml
id: cloud-storage-operations
namespace: demo.plugins
tasks:
# AWS S3 Operations
- id: s3_operations
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: upload_to_s3
type: io.kestra.plugin.aws.s3.Upload
from: "data/output.parquet"
bucket: my-data-lake
key: "raw/{{ execution.startDate | date('yyyy/MM/dd') }}/data.parquet"
region: us-east-1
accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY') }}"
- id: list_s3_files
type: io.kestra.plugin.aws.s3.List
bucket: my-data-lake
region: us-east-1
prefix: "raw/{{ execution.startDate | date('yyyy/MM/dd') }}/"
- id: sync_s3_to_local
type: io.kestra.plugin.aws.s3.Download
bucket: my-data-lake
key: "{{ outputs.list_s3_files.uris[0] }}"
region: us-east-1
# Google Cloud Storage
- id: gcs_operations
type: io.kestra.plugin.gcp.gcs.Upload
from: "data/processed.parquet"
bucket: my-gcp-bucket
name: "processed/{{ execution.startDate | date('yyyy-MM-dd') }}.parquet"
projectId: my-gcp-project
# Azure Blob Storage
- id: azure_operations
type: io.kestra.plugin.azure.blob.Upload
from: "data/archive.parquet"
connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
container: my-container
name: "archive/{{ execution.startDate | date('yyyy/MM') }}/data.parquet"
# Multi-cloud synchronization
- id: multi_cloud_sync
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: sync_to_aws
type: io.kestra.plugin.aws.s3.Copy
from: "{{ outputs.gcs_operations.uri }}"
to: "s3://my-data-lake/gcs-mirror/{{ execution.startDate | date('yyyy-MM-dd') }}.parquet"
- id: sync_to_azure
type: io.kestra.plugin.azure.blob.Copy
from: "{{ outputs.gcs_operations.uri }}"
to: "https://myaccount.blob.core.windows.net/mycontainer/gcs-mirror/data.parquet"
Messaging and Streaming Plugins
# flows/messaging-demo.yml
id: messaging-patterns
namespace: demo.plugins
tasks:
# Kafka Producer
- id: produce_to_kafka
type: io.kestra.plugin.kafka.Produce
topic: user-events
key: "{{ inputs.user_id }}"
value: |
{
"event_type": "{{ inputs.event_type }}",
"user_id": "{{ inputs.user_id }}",
"timestamp": "{{ now() }}",
"properties": {{ inputs.properties | tojson }}
}
properties:
bootstrap.servers: "kafka-broker:9092"
acks: "all"
compression.type: "snappy"
serializer: STRING
# Kafka Consumer (Stream Processing)
- id: consume_from_kafka
type: io.kestra.plugin.kafka.Consume
topic: user-events
groupId: "etl-processor-{{ execution.startDate | date('yyyy-MM-dd') }}"
maxRecords: 1000
timeout: PT1M
properties:
bootstrap.servers: "kafka-broker:9092"
auto.offset.reset: "earliest"
# RabbitMQ
- id: rabbitmq_example
type: io.kestra.plugin.amqp.Write
uri: "amqp://user:pass@rabbitmq:5672"
exchange: events
routingKey: "user.{{ inputs.user_type }}"
content: |
{{ inputs.event_data | tojson }}
contentType: "application/json"
# AWS SQS
- id: sqs_example
type: io.kestra.plugin.aws.sqs.Send
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
messageBody: |
{{ inputs.message | tojson }}
region: us-east-1
# Real-time Data Pipeline
- id: real_time_pipeline
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: consume_stream
type: io.kestra.plugin.kafka.Consume
topic: clickstream
groupId: real-time-processor
maxRecords: 100
timeout: PT10S
- id: process_in_realtime
type: io.kestra.plugin.scripts.python.Script
inputFiles:
messages.json: "{{ outputs.consume_stream.uri }}"
script: |
import json
import pandas as pd
from datetime import datetime
with open('messages.json', 'r') as f:
messages = json.load(f)
# Real-time processing
events = []
for msg in messages:
event = json.loads(msg['value'])
# Enrich in real-time
event['processed_at'] = datetime.now().isoformat()
event['hour_of_day'] = datetime.fromisoformat(event['timestamp']).hour
event['is_peak'] = 9 <= event['hour_of_day'] <= 17
events.append(event)
# Batch write for efficiency
if events:
df = pd.DataFrame(events)
df.to_parquet(f'processed_{datetime.now().strftime("%H%M%S")}.parquet')
print(f"Processed {len(events)} events in real-time")
- id: produce_results
type: io.kestra.plugin.kafka.Produce
topic: processed-events
key: "{{ execution.startDate | date('yyyy-MM-dd HH') }}"
value: "{{ outputs.process_in_realtime.outputFiles | tojson }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.process_in_realtime.state.current == 'SUCCESS' }}"
Scripting and Data Processing Plugins
# flows/scripting-demo.yml
id: multi-language-scripts
namespace: demo.plugins
tasks:
# Python with dependencies
- id: python_with_deps
type: io.kestra.plugin.scripts.python.Script
description: "Python script with custom dependencies"
script: |
import pandas as pd
import numpy as np
from scipy import stats
import requests
# Your data science code here
data = pd.DataFrame({
'x': np.random.normal(0, 1, 1000),
'y': np.random.normal(0, 1, 1000)
})
correlation = data['x'].corr(data['y'])
print(f"Correlation: {correlation:.3f}")
# Save results
data.to_parquet('analysis_result.parquet', index=False)
docker:
image: "kestra/kestra-python:latest"
volumes:
- /tmp:/tmp
inputFiles:
config.yaml: |
dataset:
name: "sample_data"
version: "1.0"
outputFiles:
- "*.parquet"
- "*.csv"
# R for statistical analysis
- id: r_analysis
type: io.kestra.plugin.scripts.r.Script
script: |
library(dplyr)
library(ggplot2)
library(jsonlite)
# Load data
data <- read.csv("input_data.csv")
# Statistical analysis
result <- data %>%
group_by(category) %>%
summarise(
mean = mean(value),
sd = sd(value),
n = n()
)
# Create visualization
plot <- ggplot(data, aes(x=category, y=value)) +
geom_boxplot() +
theme_minimal()
ggsave("boxplot.png", plot, width=8, height=6)
# Output results
write_json(result, "analysis_result.json")
cat("Analysis complete\n")
docker:
image: rocker/tidyverse:latest
# Node.js for API interactions
- id: nodejs_api_client
type: io.kestra.plugin.scripts.node.Script
script: |
const axios = require('axios');
const fs = require('fs').promises;
async function fetchData() {
const response = await axios.get('https://api.example.com/data');
const data = response.data;
// Transform data
const transformed = data.map(item => ({
id: item.id,
name: item.name.toUpperCase(),
timestamp: new Date().toISOString()
}));
// Save to file
await fs.writeFile(
'api_data.json',
JSON.stringify(transformed, null, 2)
);
console.log(`Fetched ${transformed.length} records`);
}
fetchData().catch(console.error);
docker:
image: node:18-alpine
# Bash for system operations
- id: bash_operations
type: io.kestra.plugin.scripts.shell.Commands
commands:
- find /data -name "*.parquet" -type f -mtime -7 | wc -l
- du -sh /data/* | sort -hr
- echo "Processing started at $(date)"
- tar -czf archive_$(date +%Y%m%d_%H%M%S).tar.gz /data/input
exitOnFailed: false
# SQL-based transformation
- id: sql_transformation
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
-- Load data from multiple formats
CREATE TABLE users AS
SELECT * FROM read_parquet('users.parquet');
CREATE TABLE orders AS
SELECT * FROM read_csv('orders.csv');
-- Complex transformation
WITH user_stats AS (
SELECT
u.user_id,
u.join_date,
COUNT(o.order_id) as order_count,
SUM(o.amount) as total_spent,
AVG(o.amount) as avg_order_value
FROM users u
LEFT JOIN orders o ON u.user_id = o.user_id
GROUP BY u.user_id, u.join_date
)
SELECT * FROM user_stats
ORDER BY total_spent DESC;
fetch: true
store: true
Machine Learning Plugins
# flows/ml-pipeline.yml
id: ml-training-pipeline
namespace: demo.ml
tasks:
# Data Preparation
- id: prepare_training_data
type: io.kestra.plugin.scripts.python.Script
script: |
import pandas as pd
from sklearn.model_selection import train_test_split
import joblib
# Load and prepare data
df = pd.read_parquet('raw_data.parquet')
# Feature engineering
df['feature1'] = df['value1'] * df['value2']
df['feature2'] = df['value3'].apply(lambda x: 1 if x > 0 else 0)
# Split data
X = df[['feature1', 'feature2', 'value4']]
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Save prepared data
joblib.dump({
'X_train': X_train,
'X_test': X_test,
'y_train': y_train,
'y_test': y_test,
'feature_names': X.columns.tolist()
}, 'prepared_data.pkl')
print(f"Prepared data: {len(X_train)} training, {len(X_test)} test samples")
# Model Training with TensorFlow
- id: train_tensorflow_model
type: io.kestra.plugin.scripts.python.Script
inputFiles:
data.pkl: "{{ outputs.prepare_training_data.outputFiles['prepared_data.pkl'] }}"
script: |
import tensorflow as tf
from tensorflow import keras
import joblib
import numpy as np
# Load prepared data
data = joblib.load('data.pkl')
X_train, X_test, y_train, y_test = (
data['X_train'], data['X_test'],
data['y_train'], data['y_test']
)
# Build model
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(X_train.shape[1],)),
keras.layers.Dropout(0.2),
keras.layers.Dense(32, activation='relu'),
keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# Train model
history = model.fit(
X_train, y_train,
validation_split=0.2,
epochs=10,
batch_size=32,
verbose=1
)
# Evaluate
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(f"Test accuracy: {test_acc:.4f}")
# Save model
model.save('trained_model.h5')
# Save metrics
metrics = {
'test_accuracy': float(test_acc),
'test_loss': float(test_loss),
'training_history': history.history
}
import json
with open('model_metrics.json', 'w') as f:
json.dump(metrics, f)
docker:
image: tensorflow/tensorflow:2.13.0
gpu: true # Enable GPU if available
# Model Serving with Triton
- id: deploy_model_triton
type: io.kestra.plugin.triton.DeployModel
modelRepository: "/models"
modelName: "customer-churn"
modelVersion: "1"
modelFile: "{{ outputs.train_tensorflow_model.outputFiles['trained_model.h5'] }}"
config: |
platform: "tensorflow_savedmodel"
max_batch_size: 64
input [
{
name: "input"
data_type: TYPE_FP32
dims: [ -1, 3 ]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [ -1, 1 ]
}
]
# ML Monitoring with Evidently
- id: monitor_model_drift
type: io.kestra.plugin.scripts.python.Script
script: |
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
import joblib
# Load reference data (training data)
ref_data = pd.read_parquet('reference_data.parquet')
# Load current data (production data)
curr_data = pd.read_parquet('current_data.parquet')
# Generate drift report
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(
reference_data=ref_data,
current_data=curr_data
)
# Save report
drift_report.save_html('data_drift_report.html')
# Extract metrics
drift_metrics = drift_report.as_dict()
drift_score = drift_metrics['metrics'][0]['result']['dataset_drift']
print(f"Data drift detected: {drift_score}")
# Alert if drift exceeds threshold
if drift_score:
print("ALERT: Significant data drift detected!")
Part 3: Creating Custom Plugins
Plugin Development Environment Setup
# Create plugin project
mvn archetype:generate \
-DarchetypeGroupId=io.kestra \
-DarchetypeArtifactId=kestra-plugin-archetype \
-DarchetypeVersion=0.15.0 \
-DgroupId=com.company.kestra \
-DartifactId=kestra-plugin-custom \
-Dversion=1.0.0 \
-Dpackage=com.company.kestra.plugin.custom
# Build plugin
cd kestra-plugin-custom
mvn clean package
# Install locally
mvn install
# Test plugin
mvn test
# Deploy to Kestra
cp target/kestra-plugin-custom-1.0.0.jar /path/to/kestra/plugins/
Simple Custom Plugin Example
// src/main/java/com/company/kestra/plugin/custom/HelloWorld.java
package com.company.kestra.plugin.custom;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import javax.validation.constraints.NotNull;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
title = "Say hello to someone",
code = {
"id: hello",
"type: com.company.kestra.plugin.custom.HelloWorld",
"name: John"
}
)
}
)
@Schema(
title = "Hello World Task",
description = "A simple custom plugin that greets someone"
)
public class HelloWorld extends Task implements RunnableTask<HelloWorld.Output> {
@PluginProperty
@Schema(title = "Name to greet")
@NotNull
private String name;
@PluginProperty
@Schema(title = "Greeting message")
private String message;
@Override
public Output run(RunContext runContext) throws Exception {
String greeting = message != null ? message : "Hello";
String fullMessage = greeting + ", " + name + "!";
// Log the greeting
runContext.logger().info(fullMessage);
// You can also render variables
String renderedName = runContext.render(name);
return Output.builder()
.greeting(fullMessage)
.renderedName(renderedName)
.executionTime(System.currentTimeMillis())
.build();
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(title = "The greeting message")
private final String greeting;
@Schema(title = "Rendered name")
private final String renderedName;
@Schema(title = "Execution time in milliseconds")
private final Long executionTime;
}
}
Database Query Plugin Example
// src/main/java/com/company/kestra/plugin/custom/CustomDatabaseQuery.java
package com.company.kestra.plugin.custom;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import javax.validation.constraints.NotNull;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
title = "Query custom database",
code = {
"id: custom_query",
"type: com.company.kestra.plugin.custom.CustomDatabaseQuery",
"connectionString: jdbc:custom://host:port/db",
"username: admin",
"password: \"{{ secret('CUSTOM_DB_PASS') }}\"",
"sql: SELECT * FROM users WHERE active = true"
}
)
}
)
@Schema(title = "Custom Database Query", description = "Query a custom database system")
public class CustomDatabaseQuery extends Task implements RunnableTask<CustomDatabaseQuery.Output> {
@NotNull
@PluginProperty(dynamic = true)
@Schema(title = "JDBC connection string")
private String connectionString;
@PluginProperty(dynamic = true)
@Schema(title = "Database username")
private String username;
@PluginProperty(dynamic = true)
@Schema(title = "Database password", secret = true)
private String password;
@NotNull
@PluginProperty(dynamic = true)
@Schema(title = "SQL query to execute")
private String sql;
@PluginProperty
@Schema(title = "Fetch results (true) or execute without results (false)")
@Builder.Default
private Boolean fetch = true;
@PluginProperty
@Schema(title = "Fetch size for large results")
@Builder.Default
private Integer fetchSize = 1000;
@PluginProperty
@Schema(title = "Query timeout in seconds")
@Builder.Default
private Integer queryTimeout = 30;
@Override
public Output run(RunContext runContext) throws Exception {
// Render dynamic properties
String renderedConnectionString = runContext.render(connectionString);
String renderedUsername = runContext.render(username);
String renderedPassword = runContext.render(password);
String renderedSql = runContext.render(sql);
Connection connection = null;
PreparedStatement statement = null;
ResultSet resultSet = null;
try {
// Load custom JDBC driver
Class.forName("com.custom.Driver");
// Create connection
connection = DriverManager.getConnection(
renderedConnectionString,
renderedUsername,
renderedPassword
);
// Create statement
statement = connection.prepareStatement(renderedSql);
statement.setFetchSize(fetchSize);
statement.setQueryTimeout(queryTimeout);
if (fetch) {
// Execute query and fetch results
resultSet = statement.executeQuery();
List<Map<String, Object>> rows = new ArrayList<>();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
Map<String, Object> row = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
Object value = resultSet.getObject(i);
row.put(columnName, value);
}
rows.add(row);
}
return Output.builder()
.rowCount(rows.size())
.rows(rows)
.query(renderedSql)
.build();
} else {
// Execute update
int affectedRows = statement.executeUpdate();
return Output.builder()
.rowCount(affectedRows)
.query(renderedSql)
.build();
}
} finally {
// Clean up resources
if (resultSet != null) {
try { resultSet.close(); } catch (SQLException ignored) {}
}
if (statement != null) {
try { statement.close(); } catch (SQLException ignored) {}
}
if (connection != null) {
try { connection.close(); } catch (SQLException ignored) {}
}
}
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(title = "Number of rows affected or returned")
private final Integer rowCount;
@Schema(title = "Query results (only for SELECT queries)")
private final List<Map<String, Object>> rows;
@Schema(title = "Executed SQL query")
private final String query;
}
}
REST API Plugin Example
// src/main/java/com/company/kestra/plugin/custom/CustomApiClient.java
package com.company.kestra.plugin.custom;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import okhttp3.*;
import javax.validation.constraints.NotNull;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
title = "Call custom API",
code = {
"id: call_custom_api",
"type: com.company.kestra.plugin.custom.CustomApiClient",
"baseUrl: https://api.company.com/v1",
"endpoint: /users",
"method: GET",
"headers:",
" Authorization: Bearer {{ secret('API_TOKEN') }}",
"queryParams:",
" status: active",
" limit: 100"
}
)
}
)
@Schema(title = "Custom API Client", description = "Interact with a custom REST API")
public class CustomApiClient extends Task implements RunnableTask<CustomApiClient.Output> {
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
private static final OkHttpClient HTTP_CLIENT = new OkHttpClient.Builder()
.connectTimeout(Duration.ofSeconds(30))
.readTimeout(Duration.ofSeconds(30))
.writeTimeout(Duration.ofSeconds(30))
.build();
@NotNull
@PluginProperty(dynamic = true)
@Schema(title = "API base URL")
private String baseUrl;
@NotNull
@PluginProperty(dynamic = true)
@Schema(title = "API endpoint")
private String endpoint;
@PluginProperty
@Schema(title = "HTTP method", defaultValue = "GET")
@Builder.Default
private Method method = Method.GET;
@PluginProperty(dynamic = true)
@Schema(title = "Request headers")
private Map<String, String> headers;
@PluginProperty(dynamic = true)
@Schema(title = "Query parameters")
private Map<String, String> queryParams;
@PluginProperty(dynamic = true)
@Schema(title = "Request body")
private Object body;
@PluginProperty
@Schema(title = "Timeout in seconds")
@Builder.Default
private Integer timeout = 30;
@PluginProperty
@Schema(title = "Store response as file")
@Builder.Default
private Boolean store = false;
public enum Method {
GET, POST, PUT, DELETE, PATCH
}
@Override
public Output run(RunContext runContext) throws Exception {
// Render dynamic properties
String renderedBaseUrl = runContext.render(baseUrl);
String renderedEndpoint = runContext.render(endpoint);
Map<String, String> renderedHeaders = renderMap(runContext, headers);
Map<String, String> renderedQueryParams = renderMap(runContext, queryParams);
Object renderedBody = renderBody(runContext, body);
// Build URL with query parameters
HttpUrl.Builder urlBuilder = HttpUrl.parse(renderedBaseUrl + renderedEndpoint).newBuilder();
if (renderedQueryParams != null) {
renderedQueryParams.forEach(urlBuilder::addQueryParameter);
}
// Build request
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build());
// Add headers
if (renderedHeaders != null) {
renderedHeaders.forEach(requestBuilder::addHeader);
}
// Add body for methods that support it
if (renderedBody != null &&
(method == Method.POST || method == Method.PUT ||
method == Method.PATCH || method == Method.DELETE)) {
String bodyString;
if (renderedBody instanceof String) {
bodyString = (String) renderedBody;
requestBuilder.addHeader("Content-Type", "text/plain");
} else {
bodyString = OBJECT_MAPPER.writeValueAsString(renderedBody);
requestBuilder.addHeader("Content-Type", "application/json");
}
RequestBody requestBody = RequestBody.create(
bodyString,
MediaType.parse(requestBuilder.build().header("Content-Type"))
);
requestBuilder.method(method.name(), requestBody);
} else {
requestBuilder.method(method.name(), null);
}
// Execute request with custom timeout
OkHttpClient client = HTTP_CLIENT.newBuilder()
.connectTimeout(Duration.ofSeconds(timeout))
.readTimeout(Duration.ofSeconds(timeout))
.writeTimeout(Duration.ofSeconds(timeout))
.build();
try (Response response = client.newCall(requestBuilder.build()).execute()) {
String responseBody = response.body() != null ?
response.body().string() : null;
Map<String, Object> responseMap = new HashMap<>();
if (responseBody != null && !responseBody.isEmpty()) {
try {
responseMap = OBJECT_MAPPER.readValue(responseBody, Map.class);
} catch (Exception e) {
responseMap.put("raw", responseBody);
}
}
return Output.builder()
.statusCode(response.code())
.headers(response.headers().toMultimap())
.body(responseMap)
.success(response.isSuccessful())
.message(response.message())
.build();
}
}
private Map<String, String> renderMap(RunContext runContext, Map<String, String> map) {
if (map == null) return null;
Map<String, String> rendered = new HashMap<>();
map.forEach((key, value) -> {
rendered.put(key, runContext.render(value));
});
return rendered;
}
private Object renderBody(RunContext runContext, Object body) {
if (body == null) return null;
if (body instanceof String) {
return runContext.render((String) body);
} else if (body instanceof Map) {
// Deep render map values
Map<?, ?> map = (Map<?, ?>) body;
Map<Object, Object> rendered = new HashMap<>();
map.forEach((key, value) -> {
Object renderedKey = key instanceof String ?
runContext.render((String) key) : key;
Object renderedValue = value instanceof String ?
runContext.render((String) value) : value;
rendered.put(renderedKey, renderedValue);
});
return rendered;
}
return body;
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(title = "HTTP status code")
private final Integer statusCode;
@Schema(title = "Response headers")
private final Map<String, java.util.List<String>> headers;
@Schema(title = "Response body")
private final Map<String, Object> body;
@Schema(title = "Whether the request was successful")
private final Boolean success;
@Schema(title = "Response message")
private final String message;
}
}
Plugin Configuration (pom.xml)
<!-- pom.xml for custom plugin -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.company.kestra</groupId>
<artifactId>kestra-plugin-custom</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Kestra Custom Plugin</name>
<description>Custom plugins for Company's systems</description>
<properties>
<kestra.version>0.15.0</kestra.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Kestra Core -->
<dependency>
<groupId>io.kestra</groupId>
<artifactId>core</artifactId>
<version>${kestra.version}</version>
<scope>provided</scope>
</dependency>
<!-- External Dependencies -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.11.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>io.kestra</groupId>
<artifactId>junit5</artifactId>
<version>${kestra.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Testing Custom Plugins
// src/test/java/com/company/kestra/plugin/custom/CustomApiClientTest.java
package com.company.kestra.plugin.custom;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@MicronautTest
class CustomApiClientTest {
@Inject
private RunContextFactory runContextFactory;
@Test
void testApiCall() throws Exception {
// Create task instance
CustomApiClient task = CustomApiClient.builder()
.id("test-api")
.type(CustomApiClient.class.getName())
.baseUrl("https://httpbin.org")
.endpoint("/get")
.method(CustomApiClient.Method.GET)
.headers(Map.of(
"User-Agent", "Kestra-Plugin-Test"
))
.queryParams(Map.of(
"test", "value"
))
.build();
// Execute task
CustomApiClient.Output output = task.run(
TestsUtils.mockRunContext(
runContextFactory,
task,
new HashMap<>()
)
);
// Verify results
assertThat(output.getStatusCode(), is(200));
assertThat(output.getSuccess(), is(true));
assertThat(output.getBody(), notNullValue());
// Verify response contains our query param
Map<String, Object> args = (Map<String, Object>)
((Map<String, Object>) output.getBody().get("args"));
assertThat(args.get("test"), is("value"));
}
@Test
void testPostRequest() throws Exception {
Map<String, Object> requestBody = Map.of(
"name", "John Doe",
"email", "john@example.com",
"active", true
);
CustomApiClient task = CustomApiClient.builder()
.id("test-post")
.type(CustomApiClient.class.getName())
.baseUrl("https://httpbin.org")
.endpoint("/post")
.method(CustomApiClient.Method.POST)
.body(requestBody)
.build();
CustomApiClient.Output output = task.run(
TestsUtils.mockRunContext(
runContextFactory,
task,
new HashMap<>()
)
);
assertThat(output.getStatusCode(), is(200));
Map<String, Object> json = (Map<String, Object>)
output.getBody().get("json");
assertThat(json.get("name"), is("John Doe"));
assertThat(json.get("active"), is(true));
}
}
Plugin Documentation
# Custom Database Plugin
## Overview
This plugin allows querying custom database systems from Kestra workflows.
## Installation
1. Build the plugin:
```bash
mvn clean package
Copy the JAR to Kestra's plugins directory:
bash
cp target/kestra-plugin-custom-1.0.0.jar /opt/kestra/plugins/Restart Kestra.
Usage
Basic Query
id: query_custom_db
type: com.company.kestra.plugin.custom.CustomDatabaseQuery
connectionString: jdbc:custom://host:5432/database
username: admin
password: "{{ secret('DB_PASSWORD') }}"
sql: |
SELECT * FROM users
WHERE created_at > '2024-01-01'
fetch: true
Update Query
id: update_records
type: com.company.kestra.plugin.custom.CustomDatabaseQuery
connectionString: jdbc:custom://host:5432/database
sql: |
UPDATE users
SET status = 'inactive'
WHERE last_login < '2024-01-01'
fetch: false
Properties
| Property | Type | Required | Description |
| connectionString | String | Yes | JDBC connection string |
| username | String | No | Database username |
| password | String | No | Database password |
| sql | String | Yes | SQL query to execute |
| fetch | Boolean | No | Whether to fetch results (default: true) |
| fetchSize | Integer | No | Fetch size for large results (default: 1000) |
| queryTimeout | Integer | No | Query timeout in seconds (default: 30) |
Outputs
For SELECT queries (fetch: true):
{
"rowCount": 100,
"rows": [...],
"query": "SELECT * FROM users"
}
For UPDATE/INSERT/DELETE queries (fetch: false):
{
"rowCount": 50,
"query": "UPDATE users SET status = 'active'"
}
Examples
See the examples directory for more usage scenarios.
---
## **Part 4: Plugin Management and Optimization**
### **Plugin Registry Configuration**
```yaml
# kestra.yml - Plugin configuration
kestra:
plugins:
# Plugin repositories
repositories:
- id: kestra-io
url: https://repo.kestra.io/repository/maven-public/
type: maven
releases: true
snapshots: true
- id: company-private
url: https://nexus.company.com/repository/maven-releases/
type: maven
releases: true
snapshots: false
authentication:
username: "{{ secret('NEXUS_USER') }}"
password: "{{ secret('NEXUS_PASS') }}"
# Plugin locations
locations:
- /opt/kestra/plugins # System plugins
- /app/plugins # User plugins
# Plugin scanning
scan:
enabled: true
interval: PT5M # Scan every 5 minutes
# Plugin isolation
isolation:
enabled: true
mode: PROCESS # PROCESS or THREAD
process:
maxThreads: 10
maxMemory: 1Gi
timeout: PT5M
# Cache configuration
cache:
enabled: true
size: 1000
expireAfterWrite: PT1H
Plugin Version Management
# flows/plugin-management.yml
id: plugin-version-manager
namespace: system.plugins
tasks:
- id: check_plugin_updates
type: io.kestra.plugin.core.http.Request
uri: "https://repo.kestra.io/repository/maven-public/io/kestra/plugin/maven-metadata.xml"
store: true
- id: parse_plugin_versions
type: io.kestra.plugin.scripts.python.Script
inputFiles:
metadata.xml: "{{ outputs.check_plugin_updates.uri }}"
script: |
import xml.etree.ElementTree as ET
import json
tree = ET.parse('metadata.xml')
root = tree.getroot()
versions = []
for version in root.find('.//versioning/versions'):
versions.append(version.text)
latest = root.find('.//versioning/latest').text
release = root.find('.//versioning/release').text
result = {
'total_versions': len(versions),
'latest_version': latest,
'release_version': release,
'all_versions': versions[-10:] # Last 10 versions
}
print(json.dumps(result, indent=2))
- id: update_plugins_if_needed
type: io.kestra.plugin.core.flow.EachSequential
value: ["postgresql", "s3", "snowflake", "bigquery"]
tasks:
- id: "check_{{ task.value }}_version"
type: io.kestra.plugin.core.debug.Return
format: "{{ task.value }}: 1.2.3" # Would be actual version check
- id: "update_{{ task.value }}_if_outdated"
type: io.kestra.plugin.core.flow.Subflow
namespace: system.maintenance
flowId: update-plugin
wait: true
transmit:
- name: plugin_name
value: "kestra-plugin-jdbc-{{ task.value }}"
- name: target_version
value: "latest"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ true }}" # Actual version comparison
Plugin Performance Optimization
# flows/plugin-performance.yml
id: plugin-performance-optimization
namespace: demo.performance
tasks:
# Batch operations instead of individual calls
- id: batch_database_operations
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
-- Batch insert
INSERT INTO users (id, name, email) VALUES
{% for user in inputs.users %}
('{{ user.id }}', '{{ user.name }}', '{{ user.email }}'){{ ',' if not loop.last }}
{% endfor %}
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email;
fetch: false
# Connection pooling
- id: connection_pool_config
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://localhost:5432/db
username: user
password: pass
sql: SELECT * FROM large_table
connectionProperties:
maximumPoolSize: "10"
minimumIdle: "5"
connectionTimeout: "30000"
idleTimeout: "600000"
maxLifetime: "1800000"
# Parallel plugin execution
- id: parallel_plugin_execution
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: api_call_1
type: io.kestra.plugin.core.http.Request
uri: "https://api.example.com/data1"
- id: api_call_2
type: io.kestra.plugin.core.http.Request
uri: "https://api.example.com/data2"
- id: api_call_3
type: io.kestra.plugin.core.http.Request
uri: "https://api.example.com/data3"
# Plugin caching
- id: cached_operation
type: io.kestra.plugin.core.cache.Cached
key: "expensive_operation:{{ inputs.user_id }}"
ttl: PT1H
tasks:
- id: expensive_computation
type: io.kestra.plugin.scripts.python.Script
script: |
import time
import json
# Simulate expensive computation
time.sleep(5)
result = {"computed": True, "value": 42}
print(json.dumps(result))
# Memory optimization for large data
- id: process_large_data
type: io.kestra.plugin.jdbc.postgresql.Query
sql: SELECT * FROM very_large_table
fetch: true
fetchSize: 10000 # Process in chunks
chunkSize: 1000 # Smaller chunks for memory efficiency
Plugin Monitoring and Metrics
# flows/plugin-monitoring.yml
id: plugin-monitoring-dashboard
namespace: system.monitoring
tasks:
- id: collect_plugin_metrics
type: io.kestra.plugin.jdbc.postgresql.Query
fetch: true
sql: |
WITH plugin_stats AS (
SELECT
SUBSTRING(task_type FROM 'plugin\.([^.]*)') as plugin_name,
COUNT(*) as execution_count,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) * 1000 as avg_duration_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (end_date - start_date)) * 1000
) as p95_duration_ms,
SUM(CASE WHEN state = 'FAILED' THEN 1 ELSE 0 END) as failure_count
FROM executions
WHERE start_date >= NOW() - INTERVAL '7 days'
AND task_type LIKE 'io.kestra.plugin.%'
GROUP BY plugin_name
),
plugin_health AS (
SELECT
plugin_name,
execution_count,
avg_duration_ms,
p95_duration_ms,
failure_count,
CASE
WHEN failure_count = 0 THEN 'HEALTHY'
WHEN failure_count::float / execution_count < 0.01 THEN 'WARNING'
ELSE 'CRITICAL'
END as health_status
FROM plugin_stats
)
SELECT * FROM plugin_health
ORDER BY execution_count DESC
- id: generate_plugin_report
type: io.kestra.plugin.scripts.python.Script
inputFiles:
metrics.json: |
{{ outputs.collect_plugin_metrics.rows | tojson }}
script: |
import json
import pandas as pd
from datetime import datetime
data = json.loads('metrics.json')
df = pd.DataFrame(data)
# Create summary report
report = {
'generated_at': datetime.now().isoformat(),
'total_plugins': len(df),
'total_executions': df['execution_count'].sum(),
'health_summary': {
'healthy': len(df[df['health_status'] == 'HEALTHY']),
'warning': len(df[df['health_status'] == 'WARNING']),
'critical': len(df[df['health_status'] == 'CRITICAL'])
},
'top_plugins_by_usage': df.nlargest(10, 'execution_count').to_dict('records'),
'slowest_plugins': df.nlargest(5, 'p95_duration_ms').to_dict('records'),
'most_failing_plugins': df.nlargest(5, 'failure_count').to_dict('records')
}
print(json.dumps(report, indent=2))
- id: alert_on_plugin_issues
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK_URL') }}"
payload: |
{
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "⚠️ Plugin Health Alert"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Critical Plugins:*\n{% for plugin in outputs.generate_plugin_report.vars.most_failing_plugins %}- {{ plugin.plugin_name }}: {{ plugin.failure_count }} failures\n{% endfor %}"
}
}
]
}
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.generate_plugin_report.vars.health_summary.critical > 0 }}"
Part 5: Enterprise Plugin Patterns
Plugin Factory Pattern
// Plugin factory for creating plugin instances dynamically
public class PluginFactory {
private final Map<String, PluginCreator> creators = new HashMap<>();
public PluginFactory() {
// Register plugin creators
creators.put("salesforce", new SalesforcePluginCreator());
creators.put("sap", new SapPluginCreator());
creators.put("oracle", new OraclePluginCreator());
}
public Task createPlugin(String system, Map<String, Object> config) {
PluginCreator creator = creators.get(system.toLowerCase());
if (creator == null) {
throw new IllegalArgumentException("Unknown system: " + system);
}
return creator.create(config);
}
interface PluginCreator {
Task create(Map<String, Object> config);
}
static class SalesforcePluginCreator implements PluginCreator {
@Override
public Task create(Map<String, Object> config) {
return SalesforceQuery.builder()
.instanceUrl((String) config.get("instanceUrl"))
.clientId((String) config.get("clientId"))
.clientSecret((String) config.get("clientSecret"))
.soql((String) config.get("query"))
.build();
}
}
}
Plugin Adapter Pattern
# flows/plugin-adapter.yml
id: legacy-system-adapter
namespace: enterprise.integration
tasks:
# Adapter for legacy SOAP service
- id: soap_to_rest_adapter
type: io.kestra.plugin.scripts.python.Script
script: |
import zeep
import requests
import json
# Legacy SOAP client
client = zeep.Client('https://legacy.company.com/wsdl')
# Convert REST request to SOAP
request_data = {{ inputs.rest_request | tojson }}
soap_response = client.service.ProcessRequest(
method=request_data['method'],
params=json.dumps(request_data['params'])
)
# Convert SOAP response to REST format
rest_response = {
'status': 'success' if soap_response.success else 'error',
'data': json.loads(soap_response.data),
'timestamp': soap_response.timestamp
}
print(json.dumps(rest_response))
# Adapter for mainframe (via FTP)
- id: mainframe_adapter
type: io.kestra.plugin.fs.sftp.Download
host: mainframe.company.com
port: 22
username: "{{ secret('MAINFRAME_USER') }}"
password: "{{ secret('MAINFRAME_PASS') }}"
from: "/reports/{{ execution.startDate | date('yyyyMMdd') }}.txt"
- id: parse_mainframe_data
type: io.kestra.plugin.scripts.python.Script
inputFiles:
mainframe_data.txt: "{{ outputs.mainframe_adapter.uri }}"
script: |
# Parse fixed-width mainframe format
import struct
import pandas as pd
with open('mainframe_data.txt', 'r') as f:
lines = f.readlines()
# Define fixed-width format (example)
format_spec = [
('customer_id', 10),
('name', 30),
('balance', 12),
('status', 1)
]
records = []
for line in lines:
record = {}
pos = 0
for field, width in format_spec:
value = line[pos:pos+width].strip()
record[field] = value
pos += width
records.append(record)
df = pd.DataFrame(records)
df.to_parquet('mainframe_parsed.parquet', index=False)
Plugin Security Patterns
# flows/secure-plugin-usage.yml
id: secure-plugin-patterns
namespace: enterprise.security
tasks:
# Secrets management
- id: secure_database_connection
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://{{ secret('DB_HOST') }}:5432/{{ secret('DB_NAME') }}"
username: "{{ secret('DB_USER') }}"
password: "{{ secret('DB_PASSWORD') }}"
sql: SELECT * FROM secure_table
connectionProperties:
ssl: "true"
sslmode: "verify-full"
sslrootcert: "/certs/ca.pem"
# Encrypted data handling
- id: process_encrypted_data
type: io.kestra.plugin.scripts.python.Script
script: |
import gnupg
import pandas as pd
from io import StringIO
# Initialize GPG
gpg = gnupg.GPG(gnupghome='/app/.gnupg')
# Decrypt data
with open('encrypted_data.pgp', 'rb') as f:
decrypted = gpg.decrypt_file(f, passphrase='{{ secret('GPG_PASSPHRASE') }}')
if decrypted.ok:
# Process decrypted data
df = pd.read_csv(StringIO(str(decrypted)))
# ... processing ...
# Re-encrypt results
encrypted = gpg.encrypt(
df.to_csv(index=False),
recipients=['data-team@company.com'],
always_trust=True
)
with open('results.pgp', 'w') as f:
f.write(str(encrypted))
else:
raise Exception(f"Decryption failed: {decrypted.stderr}")
docker:
image: python:3.11-slim
volumes:
- /secrets/gnupg:/app/.gnupg:ro
# Audit logging for sensitive operations
- id: audited_plugin_operation
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: log_operation_start
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO audit_log
(operation, user_id, start_time, parameters, execution_id)
VALUES (
'pii_data_extract',
'{{ execution.user }}',
NOW(),
'{{ inputs | tojson }}',
'{{ execution.id }}'
)
- id: perform_sensitive_operation
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
SELECT
user_id,
email,
phone_number,
masked_ssn
FROM pii_data
WHERE department = '{{ inputs.department }}'
fetch: true
- id: log_operation_complete
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
UPDATE audit_log
SET
end_time = NOW(),
row_count = {{ outputs.perform_sensitive_operation.rowCount }},
status = 'SUCCESS'
WHERE execution_id = '{{ execution.id }}'
Plugin Testing Framework
# flows/plugin-testing-framework.yml
id: plugin-testing-pipeline
namespace: quality.assurance
inputs:
- name: plugin_name
type: STRING
required: true
- name: test_scenarios
type: JSON
required: true
tasks:
- id: setup_test_environment
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: start_test_database
type: io.kestra.plugin.docker.Run
image: postgres:15
env:
POSTGRES_PASSWORD: test
POSTGRES_DB: test_db
ports:
- 5432:5432
wait: true
maxDuration: PT1M
- id: start_test_api
type: io.kestra.plugin.docker.Run
image: mockserver/mockserver
command: -serverPort 1080
ports:
- 1080:1080
wait: true
maxDuration: PT1M
- id: run_plugin_tests
type: io.kestra.plugin.core.flow.EachSequential
value: "{{ inputs.test_scenarios }}"
tasks:
- id: "execute_test_{{ taskloop.index }}"
type: io.kestra.plugin.core.flow.Switch
value: "{{ task.value.type }}"
cases:
database:
- id: run_database_test
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://localhost:5432/test_db
username: postgres
password: test
sql: "{{ task.value.query }}"
fetch: true
api:
- id: setup_api_mock
type: io.kestra.plugin.core.http.Request
uri: http://localhost:1080/mockserver/expectation
method: PUT
body: "{{ task.value.mock_config | tojson }}"
- id: test_api_call
type: io.kestra.plugin.core.http.Request
uri: "http://localhost:1080{{ task.value.endpoint }}"
method: "{{ task.value.method }}"
body: "{{ task.value.request_body | tojson }}"
performance:
- id: run_performance_test
type: io.kestra.plugin.core.flow.EachSequential
value: "{{ range(1, task.value.iterations + 1) }}"
maxParallel: 10
tasks:
- id: "iteration_{{ taskloop.value }}"
type: io.kestra.plugin.core.debug.Return
format: "{{ task.value }}"
- id: collect_test_results
type: io.kestra.plugin.scripts.python.Script
script: |
import json
import statistics
# Collect results from all tests
test_results = []
{% for i in range(1, inputs.test_scenarios|length + 1) %}
test_output = {{ outputs.run_plugin_tests.outputs['execute_test_' + i|string] | default({}) | tojson }}
if test_output:
test_results.append({
'scenario': {{ i }},
'success': test_output.get('state') == 'SUCCESS',
'duration': test_output.get('duration', 0)
})
{% endfor %}
# Calculate statistics
summary = {
'total_tests': len(test_results),
'passed_tests': sum(1 for r in test_results if r['success']),
'failed_tests': sum(1 for r in test_results if not r['success']),
'pass_rate': sum(1 for r in test_results if r['success']) / len(test_results) * 100 if test_results else 0,
'avg_duration': statistics.mean(r['duration'] for r in test_results) if test_results else 0,
'results': test_results
}
print(json.dumps(summary, indent=2))
- id: generate_test_report
type: io.kestra.plugin.notifications.email.EmailSend
to: "qa-team@company.com"
subject: "Plugin Test Results - {{ inputs.plugin_name }}"
htmlBody: |
<h1>Plugin Test Report</h1>
<p><strong>Plugin:</strong> {{ inputs.plugin_name }}</p>
<p><strong>Pass Rate:</strong> {{ outputs.collect_test_results.vars.pass_rate | round(2) }}%</p>
<p><strong>Total Tests:</strong> {{ outputs.collect_test_results.vars.total_tests }}</p>
<p><strong>Execution:</strong> {{ execution.id }}</p>
Part 6: Contributing to the Plugin Ecosystem
Open Source Contribution Guide
# 1. Fork the repository
https://github.com/kestra-io/kestra
# 2. Clone your fork
git clone https://github.com/YOUR_USERNAME/kestra.git
cd kestra
# 3. Create a feature branch
git checkout -b feature/new-plugin
# 4. Create plugin structure
mkdir -p plugin-<name>/src/main/java/io/kestra/plugin/<category>
mkdir -p plugin-<name>/src/test/java/io/kestra/plugin/<category>
# 5. Implement your plugin
# (Follow existing plugin patterns)
# 6. Write tests
# (Aim for >80% test coverage)
# 7. Update documentation
# - README.md in plugin directory
# - Add examples in @Plugin annotation
# - Update plugin list in docs
# 8. Run tests
mvn test -pl plugin-<name>
# 9. Submit pull request
git push origin feature/new-plugin
# Create PR on GitHub
Plugin Contribution Checklist
## New Plugin Contribution Checklist
### ✅ Code Quality
- [ ] Follows Kestra coding conventions
- [ ] Proper error handling and logging
- [ ] Input validation with @NotNull annotations
- [ ] Comprehensive JavaDoc comments
- [ ] No SonarQube issues
### ✅ Testing
- [ ] Unit tests for all public methods
- [ ] Integration tests with real systems
- [ ] Test edge cases and error conditions
- [ ] Mock external dependencies
- [ ] Test coverage > 80%
### ✅ Documentation
- [ ] README.md with usage examples
- [ ] @Plugin annotation with examples
- [ ] Swagger annotations for all properties
- [ ] Clear property descriptions
- [ ] Example flows in /examples directory
### ✅ Configuration
- [ ] Proper Maven pom.xml
- [ ] Plugin registration in parent pom
- [ ] Version aligned with Kestra core
- [ ] No conflicting dependencies
### ✅ Security
- [ ] Secrets marked with secret = true
- [ ] No hardcoded credentials
- [ ] Input sanitization
- [ ] SSL/TLS support where applicable
### ✅ Performance
- [ ] Connection pooling for databases
- [ ] Batch operations for bulk data
- [ ] Memory-efficient streaming
- [ ] Timeout configurations
Community Plugin Gallery
# Example of a well-documented community plugin
id: community-plugin-showcase
namespace: demo.community
tasks:
# Example: dbt plugin from community
- id: run_dbt_models
type: io.kestra.plugin.dbt.cli.Run
projectDir: "s3://my-dbt-project/"
profilesDir: "s3://my-dbt-profiles/"
target: prod
select:
- tag:hourly
- model_name+
exclude:
- tag:disabled
# Example: Great Expectations plugin
- id: data_quality_check
type: io.kestra.plugin.greatexpectations.Validation
expectationSuite: |
{
"expectation_suite_name": "customer_data",
"expectations": [
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {"column": "customer_id"}
}
]
}
data:
type: file
path: "{{ inputs.data_file }}"
# Example: Airbyte connector
- id: sync_with_airbyte
type: io.kestra.plugin.airbyte.Sync
connectionId: "{{ secret('AIRBYTE_CONNECTION_ID') }}"
workspaceId: "{{ secret('AIRBYTE_WORKSPACE_ID') }}"
apiUrl: "https://api.airbyte.com"
wait: true
# Example: Dagster integration
- id: trigger_dagster_pipeline
type: io.kestra.plugin.dagster.Run
repository: my_repo
pipeline: process_data
mode: default
runConfig:
resources:
io_manager:
config:
s3_bucket: "dagster-output"
Conclusion: Becoming a Plugin Master
Key Takeaways
Kestra's plugin ecosystem is vast: 150+ plugins covering every data engineering need
Custom plugins are straightforward: Java-based, with clear interfaces and patterns
Enterprise integration is possible: Create plugins for any system, no matter how legacy
Performance matters: Optimize plugins for production workloads
Community contribution is valuable: Share your plugins to help others
Next Steps for You
Audit your data stack: What systems need plugins?
Start simple: Create one custom plugin for an internal API
Contribute: Find a missing plugin in the ecosystem and build it
Share: Write a blog post about your plugin journey
Optimize: Review existing plugins for performance improvements
Resources
Official Documentation: https://kestra.io/docs/plugins/develop
Plugin Examples: https://github.com/kestra-io/examples
Community Forum: https://github.com/kestra-io/kestra/discussions
Plugin Template: https://github.com/kestra-io/plugin-template
In the next article, we'll dive into Monitoring and Observability in Kestra - how to ensure your workflows are running smoothly and troubleshoot when they're not.
Before the next article, try:
Create a simple custom plugin for a system you use
Optimize an existing plugin configuration for better performance
Contribute documentation or examples to an existing plugin
Set up a plugin testing pipeline for your team
Share your plugin creation experience in the Kestra community
Remember: Every great data platform starts with great plugins. What will you build?
Happy plugin developing! 🛠️


