Article 6: Mastering Kestra's Plugin Ecosystem.
Extending Kestra to Every Corner of Your Data Stack.
Introduction: The Power of Plugins
Imagine you're a master chef. You don't just have one knife - you have specialized tools for every task: a paring knife for delicate work, a chef's knife for chopping, a boning knife for meat, and a bread knife for, well, bread. That's what Kestra's plugin ecosystem gives you: specialized tools for every data engineering task.
In this article, you'll learn to:
Navigate Kestra's vast plugin library (150+ plugins and counting)
Create your own custom plugins for proprietary systems
Build plugin-based architectures for enterprise integration
Debug and optimize plugin performance
Contribute to the open-source plugin ecosystem
Quick Stats:
150+ official plugins available
1,000+ companies using Kestra plugins
10,000+ plugin downloads per week
0 lines of code needed for most integrations
Part 1: The Plugin Architecture
Understanding Plugin Layers
Kestra's plugin architecture is like a LEGO system:
Application Layer (Your Flows)
↓
Plugin Interface (Java Interfaces)
↓
Plugin Implementation (Built-in/Custom)
↓
Target System (Databases, APIs, Cloud Services)
Core Plugin Types
# Every plugin belongs to one of these categories
plugins:
- Input/Output:
- Filesystems: S3, GCS, Azure Blob, Local, FTP
- Databases: PostgreSQL, MySQL, Snowflake, BigQuery
- APIs: REST, GraphQL, SOAP
- Processing:
- Scripting: Python, R, Node.js, Bash
- Data Processing: Spark, Flink, Pandas
- ML/AI: TensorFlow, PyTorch, Hugging Face
- Messaging:
- Queues: Kafka, RabbitMQ, AWS SQS
- Streaming: Kafka Streams, Apache Pulsar
- Cloud Services:
- AWS: S3, Lambda, Glue, Redshift
- GCP: BigQuery, Cloud Functions, Dataflow
- Azure: Blob Storage, Data Factory, Synapse
- Monitoring:
- Metrics: Prometheus, Datadog, New Relic
- Logging: ELK, Splunk, CloudWatch
- Alerting: Slack, PagerDuty, Email
- Custom:
- Enterprise Systems: SAP, Salesforce, Oracle
- Internal APIs: Your company's systems
- Legacy Systems: Mainframes, AS/400
Plugin Anatomy: What Makes a Good Plugin?
// The anatomy of a Kestra plugin (simplified)
public class MyPlugin extends Task implements FlowableTask {
// 1. Configuration (YAML properties)
@PluginProperty private String connectionString;
@PluginProperty private Integer timeout;
// 2. Execution logic
@Override
public List<TaskRun> run(RunContext runContext) throws Exception {
// Your plugin logic here
return List.of(TaskRun.of(this, Outputs.of("result", "success")));
}
// 3. Input/Output handling
public interface Output {
String result();
}
}
Part 2: Using Built-in Plugins
Plugin Discovery and Installation
# List available plugins
kestra plugins list
# Search for specific plugins
kestra plugins search --query "postgres"
# Install a plugin
kestra plugins install kestra-plugin-jdbc-postgresql
# View installed plugins
kestra plugins installed
# Update all plugins
kestra plugins update-all
Database Plugins Deep Dive
# flows/database-demo.yml
id: database-plugin-showcase
namespace: demo.plugins
description: Showcase of database plugins
tasks:
# PostgreSQL - Full-featured SQL database
- id: postgres_example
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://localhost:5432/mydb
username: "{{ secret('PG_USER') }}"
password: "{{ secret('PG_PASS') }}"
sql: |
-- Parameterized query
SELECT * FROM users
WHERE created_at > '{{ execution.startDate | dateAdd(-7, 'DAYS') }}'
AND status = '{{ inputs.user_status }}'
fetch: true
fetchSize: 1000
store: true
# Snowflake - Cloud data warehouse
- id: snowflake_example
type: io.kestra.plugin.jdbc.snowflake.Query
url: jdbc:snowflake://account.snowflakecomputing.com
username: "{{ secret('SNOWFLAKE_USER') }}"
password: "{{ secret('SNOWFLAKE_PASS') }}"
role: SYSADMIN
warehouse: COMPUTE_WH
database: PRODUCTION
schema: ANALYTICS
sql: |
COPY INTO @my_stage/users.parquet
FROM (
SELECT * FROM users
WHERE DAY = '{{ execution.startDate | date('yyyy-MM-dd') }}'
)
FILE_FORMAT = (TYPE = PARQUET)
store: true
# BigQuery - Serverless data warehouse
- id: bigquery_example
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
SELECT
user_id,
COUNT(*) as session_count,
SUM(duration) as total_duration
FROM `project.dataset.sessions`
WHERE DATE(timestamp) = '{{ execution.startDate | date('yyyy-MM-dd') }}'
GROUP BY user_id
destinationTable:
projectId: my-project
datasetId: results
tableId: daily_sessions
writeDisposition: WRITE_TRUNCATE
createDisposition: CREATE_IF_NEEDED
# ClickHouse - OLAP database
- id: clickhouse_example
type: io.kestra.plugin.jdbc.clickhouse.Query
url: jdbc:clickhouse://localhost:8123/analytics
sql: |
SELECT
toStartOfDay(timestamp) as day,
uniq(user_id) as daily_active_users,
count(*) as events
FROM events
WHERE timestamp >= '{{ execution.startDate | dateAdd(-30, 'DAYS') }}'
GROUP BY day
ORDER BY day
fetch: true
# Multi-database Join (Virtual Data Warehouse)
- id: cross_database_join
type: io.kestra.plugin.scripts.python.Script
description: "Join data from multiple databases"
inputFiles:
postgres_data.json: "{{ outputs.postgres_example.uri }}"
snowflake_data.json: "{{ outputs.snowflake_example.uri }}"
bigquery_data.json: "{{ outputs.bigquery_example.uri }}"
script: |
import pandas as pd
import json
# Load from different sources
pg_df = pd.read_json('postgres_data.json')
sf_df = pd.read_json('snowflake_data.json')
bq_df = pd.read_json('bigquery_data.json')
# Perform cross-database join
merged = pd.merge(
pg_df,
sf_df,
on='user_id',
how='left'
).merge(
bq_df,
on='user_id',
how='left'
)
# Save result
merged.to_parquet('cross_database_result.parquet', index=False)
print(f"Merged {len(merged)} records from 3 databases")
Cloud Storage Plugins
# flows/cloud-storage.yml
id: cloud-storage-operations
namespace: demo.plugins
tasks:
# AWS S3 Operations
- id: s3_operations
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: upload_to_s3
type: io.kestra.plugin.aws.s3.Upload
from: "data/output.parquet"
bucket: my-data-lake
key: "raw/{{ execution.startDate | date('yyyy/MM/dd') }}/data.parquet"
region: us-east-1
accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}"
secretKeyId: "{{ secret('AWS_SECRET_KEY') }}"
- id: list_s3_files
type: io.kestra.plugin.aws.s3.List
bucket: my-data-lake
region: us-east-1
prefix: "raw/{{ execution.startDate | date('yyyy/MM/dd') }}/"
- id: sync_s3_to_local
type: io.kestra.plugin.aws.s3.Download
bucket: my-data-lake
key: "{{ outputs.list_s3_files.uris[0] }}"
region: us-east-1
# Google Cloud Storage
- id: gcs_operations
type: io.kestra.plugin.gcp.gcs.Upload
from: "data/processed.parquet"
bucket: my-gcp-bucket
name: "processed/{{ execution.startDate | date('yyyy-MM-dd') }}.parquet"
projectId: my-gcp-project
# Azure Blob Storage
- id: azure_operations
type: io.kestra.plugin.azure.blob.Upload
from: "data/archive.parquet"
connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
container: my-container
name: "archive/{{ execution.startDate | date('yyyy/MM') }}/data.parquet"
# Multi-cloud synchronization
- id: multi_cloud_sync
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: sync_to_aws
type: io.kestra.plugin.aws.s3.Copy
from: "{{ outputs.gcs_operations.uri }}"
to: "s3://my-data-lake/gcs-mirror/{{ execution.startDate | date('yyyy-MM-dd') }}.parquet"
- id: sync_to_azure
type: io.kestra.plugin.azure.blob.Copy
from: "{{ outputs.gcs_operations.uri }}"
to: "https://myaccount.blob.core.windows.net/mycontainer/gcs-mirror/data.parquet"
Messaging and Streaming Plugins
# flows/messaging-demo.yml
id: messaging-patterns
namespace: demo.plugins
tasks:
# Kafka Producer
- id: produce_to_kafka
type: io.kestra.plugin.kafka.Produce
topic: user-events
key: "{{ inputs.user_id }}"
value: |
{
"event_type": "{{ inputs.event_type }}",
"user_id": "{{ inputs.user_id }}",
"timestamp": "{{ now() }}",
"properties": {{ inputs.properties | tojson }}
}
properties:
bootstrap.servers: "kafka-broker:9092"
acks: "all"
compression.type: "snappy"
serializer: STRING
# Kafka Consumer (Stream Processing)
- id: consume_from_kafka
type: io.kestra.plugin.kafka.Consume
topic: user-events
groupId: "etl-processor-{{ execution.startDate | date('yyyy-MM-dd') }}"
maxRecords: 1000
timeout: PT1M
properties:
bootstrap.servers: "kafka-broker:9092"
auto.offset.reset: "earliest"
# RabbitMQ
- id: rabbitmq_example
type: io.kestra.plugin.amqp.Write
uri: "amqp://user:pass@rabbitmq:5672"
exchange: events
routingKey: "user.{{ inputs.user_type }}"
content: |
{{ inputs.event_data | tojson }}
contentType: "application/json"
# AWS SQS
- id: sqs_example
type: io.kestra.plugin.aws.sqs.Send
queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
messageBody: |
{{ inputs.message | tojson }}
region: us-east-1
# Real-time Data Pipeline
- id: real_time_pipeline
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: consume_stream
type: io.kestra.plugin.kafka.Consume
topic: clickstream
groupId: real-time-processor
maxRecords: 100
timeout: PT10S
- id: process_in_realtime
type: io.kestra.plugin.scripts.python.Script
inputFiles:
messages.json: "{{ outputs.consume_stream.uri }}"
script: |
import json
import pandas as pd
from datetime import datetime
with open('messages.json', 'r') as f:
messages = json.load(f)
# Real-time processing
events = []
for msg in messages:
event = json.loads(msg['value'])
# Enrich in real-time
event['processed_at'] = datetime.now().isoformat()
event['hour_of_day'] = datetime.fromisoformat(event['timestamp']).hour
event['is_peak'] = 9 <= event['hour_of_day'] <= 17
events.append(event)
# Batch write for efficiency
if events:
df = pd.DataFrame(events)
df.to_parquet(f'processed_{datetime.now().strftime("%H%M%S")}.parquet')
print(f"Processed {len(events)} events in real-time")
- id: produce_results
type: io.kestra.plugin.kafka.Produce
topic: processed-events
key: "{{ execution.startDate | date('yyyy-MM-dd HH') }}"
value: "{{ outputs.process_in_realtime.outputFiles | tojson }}"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.process_in_realtime.state.current == 'SUCCESS' }}"
Scripting and Data Processing Plugins
# flows/scripting-demo.yml
id: multi-language-scripts
namespace: demo.plugins
tasks:
# Python with dependencies
- id: python_with_deps
type: io.kestra.plugin.scripts.python.Script
description: "Python script with custom dependencies"
script: |
import pandas as pd
import numpy as np
from scipy import stats
import requests
# Your data science code here
data = pd.DataFrame({
'x': np.random.normal(0, 1, 1000),
'y': np.random.normal(0, 1, 1000)
})
correlation = data['x'].corr(data['y'])
print(f"Correlation: {correlation:.3f}")
# Save results
data.to_parquet('analysis_result.parquet', index=False)
docker:
image: "kestra/kestra-python:latest"
volumes:
- /tmp:/tmp
inputFiles:
config.yaml: |
dataset:
name: "sample_data"
version: "1.0"
outputFiles:
- "*.parquet"
- "*.csv"
# R for statistical analysis
- id: r_analysis
type: io.kestra.plugin.scripts.r.Script
script: |
library(dplyr)
library(ggplot2)
library(jsonlite)
# Load data
data <- read.csv("input_data.csv")
# Statistical analysis
result <- data %>%
group_by(category) %>%
summarise(
mean = mean(value),
sd = sd(value),
n = n()
)
# Create visualization
plot <- ggplot(data, aes(x=category, y=value)) +
geom_boxplot() +
theme_minimal()
ggsave("boxplot.png", plot, width=8, height=6)
# Output results
write_json(result, "analysis_result.json")
cat("Analysis complete\n")
docker:
image: rocker/tidyverse:latest
# Node.js for API interactions
- id: nodejs_api_client
type: io.kestra.plugin.scripts.node.Script
script: |
const axios = require('axios');
const fs = require('fs').promises;
async function fetchData() {
const response = await axios.get('https://api.example.com/data');
const data = response.data;
// Transform data
const transformed = data.map(item => ({
id: item.id,
name: item.name.toUpperCase(),
timestamp: new Date().toISOString()
}));
// Save to file
await fs.writeFile(
'api_data.json',
JSON.stringify(transformed, null, 2)
);
console.log(`Fetched ${transformed.length} records`);
}
fetchData().catch(console.error);
docker:
image: node:18-alpine
# Bash for system operations
- id: bash_operations
type: io.kestra.plugin.scripts.shell.Commands
commands:
- find /data -name "*.parquet" -type f -mtime -7 | wc -l
- du -sh /data/* | sort -hr
- echo "Processing started at $(date)"
- tar -czf archive_$(date +%Y%m%d_%H%M%S).tar.gz /data/input
exitOnFailed: false
# SQL-based transformation
- id: sql_transformation
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
-- Load data from multiple formats
CREATE TABLE users AS
SELECT * FROM read_parquet('users.parquet');
CREATE TABLE orders AS
SELECT * FROM read_csv('orders.csv');
-- Complex transformation
WITH user_stats AS (
SELECT
u.user_id,
u.join_date,
COUNT(o.order_id) as order_count,
SUM(o.amount) as total_spent,
AVG(o.amount) as avg_order_value
FROM users u
LEFT JOIN orders o ON u.user_id = o.user_id
GROUP BY u.user_id, u.join_date
)
SELECT * FROM user_stats
ORDER BY total_spent DESC;
fetch: true
store: true
Machine Learning Plugins
# flows/ml-pipeline.yml
id: ml-training-pipeline
namespace: demo.ml
tasks:
# Data Preparation
- id: prepare_training_data
type: io.kestra.plugin.scripts.python.Script
script: |
import pandas as pd
from sklearn.model_selection import train_test_split
import joblib
# Load and prepare data
df = pd.read_parquet('raw_data.parquet')
# Feature engineering
df['feature1'] = df['value1'] * df['value2']
df['feature2'] = df['value3'].apply(lambda x: 1 if x > 0 else 0)
# Split data
X = df[['feature1', 'feature2', 'value4']]
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Save prepared data
joblib.dump({
'X_train': X_train,
'X_test': X_test,
'y_train': y_train,
'y_test': y_test,
'feature_names': X.columns.tolist()
}, 'prepared_data.pkl')
print(f"Prepared data: {len(X_train)} training, {len(X_test)} test samples")
# Model Training with TensorFlow
- id: train_tensorflow_model
type: io.kestra.plugin.scripts.python.Script
inputFiles:
data.pkl: "{{ outputs.prepare_training_data.outputFiles['prepared_data.pkl'] }}"
script: |
import tensorflow as tf
from tensorflow import keras
import joblib
import numpy as np
# Load prepared data
data = joblib.load('data.pkl')
X_train, X_test, y_train, y_test = (
data['X_train'], data['X_test'],
data['y_train'], data['y_test']
)
# Build model
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(X_train.shape[1],)),
keras.layers.Dropout(0.2),
keras.layers.Dense(32, activation='relu'),
keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# Train model
history = model.fit(
X_train, y_train,
validation_split=0.2,
epochs=10,
batch_size=32,
verbose=1
)
# Evaluate
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(f"Test accuracy: {test_acc:.4f}")
# Save model
model.save('trained_model.h5')
# Save metrics
metrics = {
'test_accuracy': float(test_acc),
'test_loss': float(test_loss),
'training_history': history.history
}
import json
with open('model_metrics.json', 'w') as f:
json.dump(metrics, f)
docker:
image: tensorflow/tensorflow:2.13.0
gpu: true # Enable GPU if available
# Model Serving with Triton
- id: deploy_model_triton
type: io.kestra.plugin.triton.DeployModel
modelRepository: "/models"
modelName: "customer-churn"
modelVersion: "1"
modelFile: "{{ outputs.train_tensorflow_model.outputFiles['trained_model.h5'] }}"
config: |
platform: "tensorflow_savedmodel"
max_batch_size: 64
input [
{
name: "input"
data_type: TYPE_FP32
dims: [ -1, 3 ]
}
]
output [
{
name: "output"
data_type: TYPE_FP32
dims: [ -1, 1 ]
}
]
# ML Monitoring with Evidently
- id: monitor_model_drift
type: io.kestra.plugin.scripts.python.Script
script: |
import pandas as pd
import numpy as np
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
import joblib
# Load reference data (training data)
ref_data = pd.read_parquet('reference_data.parquet')
# Load current data (production data)
curr_data = pd.read_parquet('current_data.parquet')
# Generate drift report
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(
reference_data=ref_data,
current_data=curr_data
)
# Save report
drift_report.save_html('data_drift_report.html')
# Extract metrics
drift_metrics = drift_report.as_dict()
drift_score = drift_metrics['metrics'][0]['result']['dataset_drift']
print(f"Data drift detected: {drift_score}")
# Alert if drift exceeds threshold
if drift_score:
print("ALERT: Significant data drift detected!")
Part 3: Creating Custom Plugins
Plugin Development Environment Setup
# Create plugin project
mvn archetype:generate \
-DarchetypeGroupId=io.kestra \
-DarchetypeArtifactId=kestra-plugin-archetype \
-DarchetypeVersion=0.15.0 \
-DgroupId=com.company.kestra \
-DartifactId=kestra-plugin-custom \
-Dversion=1.0.0 \
-Dpackage=com.company.kestra.plugin.custom
# Build plugin
cd kestra-plugin-custom
mvn clean package
# Install locally
mvn install
# Test plugin
mvn test
# Deploy to Kestra
cp target/kestra-plugin-custom-1.0.0.jar /path/to/kestra/plugins/
Simple Custom Plugin Example
// src/main/java/com/company/kestra/plugin/custom/HelloWorld.java
package com.company.kestra.plugin.custom;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import javax.validation.constraints.NotNull;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
title = "Say hello to someone",
code = {
"id: hello",
"type: com.company.kestra.plugin.custom.HelloWorld",
"name: John"
}
)
}
)
@Schema(
title = "Hello World Task",
description = "A simple custom plugin that greets someone"
)
public class HelloWorld extends Task implements RunnableTask<HelloWorld.Output> {
@PluginProperty
@Schema(title = "Name to greet")
@NotNull
private String name;
@PluginProperty
@Schema(title = "Greeting message")
private String message;
@Override
public Output run(RunContext runContext) throws Exception {
String greeting = message != null ? message : "Hello";
String fullMessage = greeting + ", " + name + "!";
// Log the greeting
runContext.logger().info(fullMessage);
// You can also render variables
String renderedName = runContext.render(name);
return Output.builder()
.greeting(fullMessage)
.renderedName(renderedName)
.executionTime(System.currentTimeMillis())
.build();
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(title = "The greeting message")
private final String greeting;
@Schema(title = "Rendered name")
private final String renderedName;
@Schema(title = "Execution time in milliseconds")
private final Long executionTime;
}
}
Database Query Plugin Example
// src/main/java/com/company/kestra/plugin/custom/CustomDatabaseQuery.java
package com.company.kestra.plugin.custom;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import javax.validation.constraints.NotNull;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
title = "Query custom database",
code = {
"id: custom_query",
"type: com.company.kestra.plugin.custom.CustomDatabaseQuery",
"connectionString: jdbc:custom://host:port/db",
"username: admin",
"password: \"{{ secret('CUSTOM_DB_PASS') }}\"",
"sql: SELECT * FROM users WHERE active = true"
}
)
}
)
@Schema(title = "Custom Database Query", description = "Query a custom database system")
public class CustomDatabaseQuery extends Task implements RunnableTask<CustomDatabaseQuery.Output> {
@NotNull
@PluginProperty(dynamic = true)
@Schema(title = "JDBC connection string")
private String connectionString;
@PluginProperty(dynamic = true)
@Schema(title = "Database username")
private String username;
@PluginProperty(dynamic = true)
@Schema(title = "Database password", secret = true)
private String password;
@NotNull
@PluginProperty(dynamic = true)
@Schema(title = "SQL query to execute")
private String sql;
@PluginProperty
@Schema(title = "Fetch results (true) or execute without results (false)")
@Builder.Default
private Boolean fetch = true;
@PluginProperty
@Schema(title = "Fetch size for large results")
@Builder.Default
private Integer fetchSize = 1000;
@PluginProperty
@Schema(title = "Query timeout in seconds")
@Builder.Default
private Integer queryTimeout = 30;
@Override
public Output run(RunContext runContext) throws Exception {
// Render dynamic properties
String renderedConnectionString = runContext.render(connectionString);
String renderedUsername = runContext.render(username);
String renderedPassword = runContext.render(password);
String renderedSql = runContext.render(sql);
Connection connection = null;
PreparedStatement statement = null;
ResultSet resultSet = null;
try {
// Load custom JDBC driver
Class.forName("com.custom.Driver");
// Create connection
connection = DriverManager.getConnection(
renderedConnectionString,
renderedUsername,
renderedPassword
);
// Create statement
statement = connection.prepareStatement(renderedSql);
statement.setFetchSize(fetchSize);
statement.setQueryTimeout(queryTimeout);
if (fetch) {
// Execute query and fetch results
resultSet = statement.executeQuery();
List<Map<String, Object>> rows = new ArrayList<>();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
Map<String, Object> row = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
Object value = resultSet.getObject(i);
row.put(columnName, value);
}
rows.add(row);
}
return Output.builder()
.rowCount(rows.size())
.rows(rows)
.query(renderedSql)
.build();
} else {
// Execute update
int affectedRows = statement.executeUpdate();
return Output.builder()
.rowCount(affectedRows)
.query(renderedSql)
.build();
}
} finally {
// Clean up resources
if (resultSet != null) {
try { resultSet.close(); } catch (SQLException ignored) {}
}
if (statement != null) {
try { statement.close(); } catch (SQLException ignored) {}
}
if (connection != null) {
try { connection.close(); } catch (SQLException ignored) {}
}
}
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(title = "Number of rows affected or returned")
private final Integer rowCount;
@Schema(title = "Query results (only for SELECT queries)")
private final List<Map<String, Object>> rows;
@Schema(title = "Executed SQL query")
private final String query;
}
}
REST API Plugin Example
// src/main/java/com/company/kestra/plugin/custom/CustomApiClient.java
package com.company.kestra.plugin.custom;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import okhttp3.*;
import javax.validation.constraints.NotNull;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
title = "Call custom API",
code = {
"id: call_custom_api",
"type: com.company.kestra.plugin.custom.CustomApiClient",
"baseUrl: https://api.company.com/v1",
"endpoint: /users",
"method: GET",
"headers:",
" Authorization: Bearer {{ secret('API_TOKEN') }}",
"queryParams:",
" status: active",
" limit: 100"
}
)
}
)
@Schema(title = "Custom API Client", description = "Interact with a custom REST API")
public class CustomApiClient extends Task implements RunnableTask<CustomApiClient.Output> {
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
private static final OkHttpClient HTTP_CLIENT = new OkHttpClient.Builder()
.connectTimeout(Duration.ofSeconds(30))
.readTimeout(Duration.ofSeconds(30))
.writeTimeout(Duration.ofSeconds(30))
.build();
@NotNull
@PluginProperty(dynamic = true)
@Schema(title = "API base URL")
private String baseUrl;
@NotNull
@PluginProperty(dynamic = true)
@Schema(title = "API endpoint")
private String endpoint;
@PluginProperty
@Schema(title = "HTTP method", defaultValue = "GET")
@Builder.Default
private Method method = Method.GET;
@PluginProperty(dynamic = true)
@Schema(title = "Request headers")
private Map<String, String> headers;
@PluginProperty(dynamic = true)
@Schema(title = "Query parameters")
private Map<String, String> queryParams;
@PluginProperty(dynamic = true)
@Schema(title = "Request body")
private Object body;
@PluginProperty
@Schema(title = "Timeout in seconds")
@Builder.Default
private Integer timeout = 30;
@PluginProperty
@Schema(title = "Store response as file")
@Builder.Default
private Boolean store = false;
public enum Method {
GET, POST, PUT, DELETE, PATCH
}
@Override
public Output run(RunContext runContext) throws Exception {
// Render dynamic properties
String renderedBaseUrl = runContext.render(baseUrl);
String renderedEndpoint = runContext.render(endpoint);
Map<String, String> renderedHeaders = renderMap(runContext, headers);
Map<String, String> renderedQueryParams = renderMap(runContext, queryParams);
Object renderedBody = renderBody(runContext, body);
// Build URL with query parameters
HttpUrl.Builder urlBuilder = HttpUrl.parse(renderedBaseUrl + renderedEndpoint).newBuilder();
if (renderedQueryParams != null) {
renderedQueryParams.forEach(urlBuilder::addQueryParameter);
}
// Build request
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build());
// Add headers
if (renderedHeaders != null) {
renderedHeaders.forEach(requestBuilder::addHeader);
}
// Add body for methods that support it
if (renderedBody != null &&
(method == Method.POST || method == Method.PUT ||
method == Method.PATCH || method == Method.DELETE)) {
String bodyString;
if (renderedBody instanceof String) {
bodyString = (String) renderedBody;
requestBuilder.addHeader("Content-Type", "text/plain");
} else {
bodyString = OBJECT_MAPPER.writeValueAsString(renderedBody);
requestBuilder.addHeader("Content-Type", "application/json");
}
RequestBody requestBody = RequestBody.create(
bodyString,
MediaType.parse(requestBuilder.build().header("Content-Type"))
);
requestBuilder.method(method.name(), requestBody);
} else {
requestBuilder.method(method.name(), null);
}
// Execute request with custom timeout
OkHttpClient client = HTTP_CLIENT.newBuilder()
.connectTimeout(Duration.ofSeconds(timeout))
.readTimeout(Duration.ofSeconds(timeout))
.writeTimeout(Duration.ofSeconds(timeout))
.build();
try (Response response = client.newCall(requestBuilder.build()).execute()) {
String responseBody = response.body() != null ?
response.body().string() : null;
Map<String, Object> responseMap = new HashMap<>();
if (responseBody != null && !responseBody.isEmpty()) {
try {
responseMap = OBJECT_MAPPER.readValue(responseBody, Map.class);
} catch (Exception e) {
responseMap.put("raw", responseBody);
}
}
return Output.builder()
.statusCode(response.code())
.headers(response.headers().toMultimap())
.body(responseMap)
.success(response.isSuccessful())
.message(response.message())
.build();
}
}
private Map<String, String> renderMap(RunContext runContext, Map<String, String> map) {
if (map == null) return null;
Map<String, String> rendered = new HashMap<>();
map.forEach((key, value) -> {
rendered.put(key, runContext.render(value));
});
return rendered;
}
private Object renderBody(RunContext runContext, Object body) {
if (body == null) return null;
if (body instanceof String) {
return runContext.render((String) body);
} else if (body instanceof Map) {
// Deep render map values
Map<?, ?> map = (Map<?, ?>) body;
Map<Object, Object> rendered = new HashMap<>();
map.forEach((key, value) -> {
Object renderedKey = key instanceof String ?
runContext.render((String) key) : key;
Object renderedValue = value instanceof String ?
runContext.render((String) value) : value;
rendered.put(renderedKey, renderedValue);
});
return rendered;
}
return body;
}
@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(title = "HTTP status code")
private final Integer statusCode;
@Schema(title = "Response headers")
private final Map<String, java.util.List<String>> headers;
@Schema(title = "Response body")
private final Map<String, Object> body;
@Schema(title = "Whether the request was successful")
private final Boolean success;
@Schema(title = "Response message")
private final String message;
}
}
Plugin Configuration (pom.xml)
<!-- pom.xml for custom plugin -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.company.kestra</groupId>
<artifactId>kestra-plugin-custom</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>Kestra Custom Plugin</name>
<description>Custom plugins for Company's systems</description>
<properties>
<kestra.version>0.15.0</kestra.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Kestra Core -->
<dependency>
<groupId>io.kestra</groupId>
<artifactId>core</artifactId>
<version>${kestra.version}</version>
<scope>provided</scope>
</dependency>
<!-- External Dependencies -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.11.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>io.kestra</groupId>
<artifactId>junit5</artifactId>
<version>${kestra.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Testing Custom Plugins
// src/test/java/com/company/kestra/plugin/custom/CustomApiClientTest.java
package com.company.kestra.plugin.custom;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
@MicronautTest
class CustomApiClientTest {
@Inject
private RunContextFactory runContextFactory;
@Test
void testApiCall() throws Exception {
// Create task instance
CustomApiClient task = CustomApiClient.builder()
.id("test-api")
.type(CustomApiClient.class.getName())
.baseUrl("https://httpbin.org")
.endpoint("/get")
.method(CustomApiClient.Method.GET)
.headers(Map.of(
"User-Agent", "Kestra-Plugin-Test"
))
.queryParams(Map.of(
"test", "value"
))
.build();
// Execute task
CustomApiClient.Output output = task.run(
TestsUtils.mockRunContext(
runContextFactory,
task,
new HashMap<>()
)
);
// Verify results
assertThat(output.getStatusCode(), is(200));
assertThat(output.getSuccess(), is(true));
assertThat(output.getBody(), notNullValue());
// Verify response contains our query param
Map<String, Object> args = (Map<String, Object>)
((Map<String, Object>) output.getBody().get("args"));
assertThat(args.get("test"), is("value"));
}
@Test
void testPostRequest() throws Exception {
Map<String, Object> requestBody = Map.of(
"name", "John Doe",
"email", "john@example.com",
"active", true
);
CustomApiClient task = CustomApiClient.builder()
.id("test-post")
.type(CustomApiClient.class.getName())
.baseUrl("https://httpbin.org")
.endpoint("/post")
.method(CustomApiClient.Method.POST)
.body(requestBody)
.build();
CustomApiClient.Output output = task.run(
TestsUtils.mockRunContext(
runContextFactory,
task,
new HashMap<>()
)
);
assertThat(output.getStatusCode(), is(200));
Map<String, Object> json = (Map<String, Object>)
output.getBody().get("json");
assertThat(json.get("name"), is("John Doe"));
assertThat(json.get("active"), is(true));
}
}
Plugin Documentation
# Custom Database Plugin
## Overview
This plugin allows querying custom database systems from Kestra workflows.
## Installation
1. Build the plugin:
```bash
mvn clean package
Copy the JAR to Kestra's plugins directory:
bash
cp target/kestra-plugin-custom-1.0.0.jar /opt/kestra/plugins/Restart Kestra.
Usage
Basic Query
id: query_custom_db
type: com.company.kestra.plugin.custom.CustomDatabaseQuery
connectionString: jdbc:custom://host:5432/database
username: admin
password: "{{ secret('DB_PASSWORD') }}"
sql: |
SELECT * FROM users
WHERE created_at > '2024-01-01'
fetch: true
Update Query
id: update_records
type: com.company.kestra.plugin.custom.CustomDatabaseQuery
connectionString: jdbc:custom://host:5432/database
sql: |
UPDATE users
SET status = 'inactive'
WHERE last_login < '2024-01-01'
fetch: false
Properties
| Property | Type | Required | Description |
| connectionString | String | Yes | JDBC connection string |
| username | String | No | Database username |
| password | String | No | Database password |
| sql | String | Yes | SQL query to execute |
| fetch | Boolean | No | Whether to fetch results (default: true) |
| fetchSize | Integer | No | Fetch size for large results (default: 1000) |
| queryTimeout | Integer | No | Query timeout in seconds (default: 30) |
Outputs
For SELECT queries (fetch: true):
{
"rowCount": 100,
"rows": [...],
"query": "SELECT * FROM users"
}
For UPDATE/INSERT/DELETE queries (fetch: false):
{
"rowCount": 50,
"query": "UPDATE users SET status = 'active'"
}
Examples
See the examples directory for more usage scenarios.
---
## **Part 4: Plugin Management and Optimization**
### **Plugin Registry Configuration**
```yaml
# kestra.yml - Plugin configuration
kestra:
plugins:
# Plugin repositories
repositories:
- id: kestra-io
url: https://repo.kestra.io/repository/maven-public/
type: maven
releases: true
snapshots: true
- id: company-private
url: https://nexus.company.com/repository/maven-releases/
type: maven
releases: true
snapshots: false
authentication:
username: "{{ secret('NEXUS_USER') }}"
password: "{{ secret('NEXUS_PASS') }}"
# Plugin locations
locations:
- /opt/kestra/plugins # System plugins
- /app/plugins # User plugins
# Plugin scanning
scan:
enabled: true
interval: PT5M # Scan every 5 minutes
# Plugin isolation
isolation:
enabled: true
mode: PROCESS # PROCESS or THREAD
process:
maxThreads: 10
maxMemory: 1Gi
timeout: PT5M
# Cache configuration
cache:
enabled: true
size: 1000
expireAfterWrite: PT1H
Plugin Version Management
# flows/plugin-management.yml
id: plugin-version-manager
namespace: system.plugins
tasks:
- id: check_plugin_updates
type: io.kestra.plugin.core.http.Request
uri: "https://repo.kestra.io/repository/maven-public/io/kestra/plugin/maven-metadata.xml"
store: true
- id: parse_plugin_versions
type: io.kestra.plugin.scripts.python.Script
inputFiles:
metadata.xml: "{{ outputs.check_plugin_updates.uri }}"
script: |
import xml.etree.ElementTree as ET
import json
tree = ET.parse('metadata.xml')
root = tree.getroot()
versions = []
for version in root.find('.//versioning/versions'):
versions.append(version.text)
latest = root.find('.//versioning/latest').text
release = root.find('.//versioning/release').text
result = {
'total_versions': len(versions),
'latest_version': latest,
'release_version': release,
'all_versions': versions[-10:] # Last 10 versions
}
print(json.dumps(result, indent=2))
- id: update_plugins_if_needed
type: io.kestra.plugin.core.flow.EachSequential
value: ["postgresql", "s3", "snowflake", "bigquery"]
tasks:
- id: "check_{{ task.value }}_version"
type: io.kestra.plugin.core.debug.Return
format: "{{ task.value }}: 1.2.3" # Would be actual version check
- id: "update_{{ task.value }}_if_outdated"
type: io.kestra.plugin.core.flow.Subflow
namespace: system.maintenance
flowId: update-plugin
wait: true
transmit:
- name: plugin_name
value: "kestra-plugin-jdbc-{{ task.value }}"
- name: target_version
value: "latest"
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ true }}" # Actual version comparison
Plugin Performance Optimization
# flows/plugin-performance.yml
id: plugin-performance-optimization
namespace: demo.performance
tasks:
# Batch operations instead of individual calls
- id: batch_database_operations
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
-- Batch insert
INSERT INTO users (id, name, email) VALUES
{% for user in inputs.users %}
('{{ user.id }}', '{{ user.name }}', '{{ user.email }}'){{ ',' if not loop.last }}
{% endfor %}
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email;
fetch: false
# Connection pooling
- id: connection_pool_config
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://localhost:5432/db
username: user
password: pass
sql: SELECT * FROM large_table
connectionProperties:
maximumPoolSize: "10"
minimumIdle: "5"
connectionTimeout: "30000"
idleTimeout: "600000"
maxLifetime: "1800000"
# Parallel plugin execution
- id: parallel_plugin_execution
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: api_call_1
type: io.kestra.plugin.core.http.Request
uri: "https://api.example.com/data1"
- id: api_call_2
type: io.kestra.plugin.core.http.Request
uri: "https://api.example.com/data2"
- id: api_call_3
type: io.kestra.plugin.core.http.Request
uri: "https://api.example.com/data3"
# Plugin caching
- id: cached_operation
type: io.kestra.plugin.core.cache.Cached
key: "expensive_operation:{{ inputs.user_id }}"
ttl: PT1H
tasks:
- id: expensive_computation
type: io.kestra.plugin.scripts.python.Script
script: |
import time
import json
# Simulate expensive computation
time.sleep(5)
result = {"computed": True, "value": 42}
print(json.dumps(result))
# Memory optimization for large data
- id: process_large_data
type: io.kestra.plugin.jdbc.postgresql.Query
sql: SELECT * FROM very_large_table
fetch: true
fetchSize: 10000 # Process in chunks
chunkSize: 1000 # Smaller chunks for memory efficiency
Plugin Monitoring and Metrics
# flows/plugin-monitoring.yml
id: plugin-monitoring-dashboard
namespace: system.monitoring
tasks:
- id: collect_plugin_metrics
type: io.kestra.plugin.jdbc.postgresql.Query
fetch: true
sql: |
WITH plugin_stats AS (
SELECT
SUBSTRING(task_type FROM 'plugin\.([^.]*)') as plugin_name,
COUNT(*) as execution_count,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) * 1000 as avg_duration_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (end_date - start_date)) * 1000
) as p95_duration_ms,
SUM(CASE WHEN state = 'FAILED' THEN 1 ELSE 0 END) as failure_count
FROM executions
WHERE start_date >= NOW() - INTERVAL '7 days'
AND task_type LIKE 'io.kestra.plugin.%'
GROUP BY plugin_name
),
plugin_health AS (
SELECT
plugin_name,
execution_count,
avg_duration_ms,
p95_duration_ms,
failure_count,
CASE
WHEN failure_count = 0 THEN 'HEALTHY'
WHEN failure_count::float / execution_count < 0.01 THEN 'WARNING'
ELSE 'CRITICAL'
END as health_status
FROM plugin_stats
)
SELECT * FROM plugin_health
ORDER BY execution_count DESC
- id: generate_plugin_report
type: io.kestra.plugin.scripts.python.Script
inputFiles:
metrics.json: |
{{ outputs.collect_plugin_metrics.rows | tojson }}
script: |
import json
import pandas as pd
from datetime import datetime
data = json.loads('metrics.json')
df = pd.DataFrame(data)
# Create summary report
report = {
'generated_at': datetime.now().isoformat(),
'total_plugins': len(df),
'total_executions': df['execution_count'].sum(),
'health_summary': {
'healthy': len(df[df['health_status'] == 'HEALTHY']),
'warning': len(df[df['health_status'] == 'WARNING']),
'critical': len(df[df['health_status'] == 'CRITICAL'])
},
'top_plugins_by_usage': df.nlargest(10, 'execution_count').to_dict('records'),
'slowest_plugins': df.nlargest(5, 'p95_duration_ms').to_dict('records'),
'most_failing_plugins': df.nlargest(5, 'failure_count').to_dict('records')
}
print(json.dumps(report, indent=2))
- id: alert_on_plugin_issues
type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
url: "{{ secret('SLACK_WEBHOOK_URL') }}"
payload: |
{
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "⚠️ Plugin Health Alert"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Critical Plugins:*\n{% for plugin in outputs.generate_plugin_report.vars.most_failing_plugins %}- {{ plugin.plugin_name }}: {{ plugin.failure_count }} failures\n{% endfor %}"
}
}
]
}
conditions:
- type: io.kestra.plugin.core.condition.Expression
expression: "{{ outputs.generate_plugin_report.vars.health_summary.critical > 0 }}"
Part 5: Enterprise Plugin Patterns
Plugin Factory Pattern
// Plugin factory for creating plugin instances dynamically
public class PluginFactory {
private final Map<String, PluginCreator> creators = new HashMap<>();
public PluginFactory() {
// Register plugin creators
creators.put("salesforce", new SalesforcePluginCreator());
creators.put("sap", new SapPluginCreator());
creators.put("oracle", new OraclePluginCreator());
}
public Task createPlugin(String system, Map<String, Object> config) {
PluginCreator creator = creators.get(system.toLowerCase());
if (creator == null) {
throw new IllegalArgumentException("Unknown system: " + system);
}
return creator.create(config);
}
interface PluginCreator {
Task create(Map<String, Object> config);
}
static class SalesforcePluginCreator implements PluginCreator {
@Override
public Task create(Map<String, Object> config) {
return SalesforceQuery.builder()
.instanceUrl((String) config.get("instanceUrl"))
.clientId((String) config.get("clientId"))
.clientSecret((String) config.get("clientSecret"))
.soql((String) config.get("query"))
.build();
}
}
}
Plugin Adapter Pattern
# flows/plugin-adapter.yml
id: legacy-system-adapter
namespace: enterprise.integration
tasks:
# Adapter for legacy SOAP service
- id: soap_to_rest_adapter
type: io.kestra.plugin.scripts.python.Script
script: |
import zeep
import requests
import json
# Legacy SOAP client
client = zeep.Client('https://legacy.company.com/wsdl')
# Convert REST request to SOAP
request_data = {{ inputs.rest_request | tojson }}
soap_response = client.service.ProcessRequest(
method=request_data['method'],
params=json.dumps(request_data['params'])
)
# Convert SOAP response to REST format
rest_response = {
'status': 'success' if soap_response.success else 'error',
'data': json.loads(soap_response.data),
'timestamp': soap_response.timestamp
}
print(json.dumps(rest_response))
# Adapter for mainframe (via FTP)
- id: mainframe_adapter
type: io.kestra.plugin.fs.sftp.Download
host: mainframe.company.com
port: 22
username: "{{ secret('MAINFRAME_USER') }}"
password: "{{ secret('MAINFRAME_PASS') }}"
from: "/reports/{{ execution.startDate | date('yyyyMMdd') }}.txt"
- id: parse_mainframe_data
type: io.kestra.plugin.scripts.python.Script
inputFiles:
mainframe_data.txt: "{{ outputs.mainframe_adapter.uri }}"
script: |
# Parse fixed-width mainframe format
import struct
import pandas as pd
with open('mainframe_data.txt', 'r') as f:
lines = f.readlines()
# Define fixed-width format (example)
format_spec = [
('customer_id', 10),
('name', 30),
('balance', 12),
('status', 1)
]
records = []
for line in lines:
record = {}
pos = 0
for field, width in format_spec:
value = line[pos:pos+width].strip()
record[field] = value
pos += width
records.append(record)
df = pd.DataFrame(records)
df.to_parquet('mainframe_parsed.parquet', index=False)
Plugin Security Patterns
# flows/secure-plugin-usage.yml
id: secure-plugin-patterns
namespace: enterprise.security
tasks:
# Secrets management
- id: secure_database_connection
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://{{ secret('DB_HOST') }}:5432/{{ secret('DB_NAME') }}"
username: "{{ secret('DB_USER') }}"
password: "{{ secret('DB_PASSWORD') }}"
sql: SELECT * FROM secure_table
connectionProperties:
ssl: "true"
sslmode: "verify-full"
sslrootcert: "/certs/ca.pem"
# Encrypted data handling
- id: process_encrypted_data
type: io.kestra.plugin.scripts.python.Script
script: |
import gnupg
import pandas as pd
from io import StringIO
# Initialize GPG
gpg = gnupg.GPG(gnupghome='/app/.gnupg')
# Decrypt data
with open('encrypted_data.pgp', 'rb') as f:
decrypted = gpg.decrypt_file(f, passphrase='{{ secret('GPG_PASSPHRASE') }}')
if decrypted.ok:
# Process decrypted data
df = pd.read_csv(StringIO(str(decrypted)))
# ... processing ...
# Re-encrypt results
encrypted = gpg.encrypt(
df.to_csv(index=False),
recipients=['data-team@company.com'],
always_trust=True
)
with open('results.pgp', 'w') as f:
f.write(str(encrypted))
else:
raise Exception(f"Decryption failed: {decrypted.stderr}")
docker:
image: python:3.11-slim
volumes:
- /secrets/gnupg:/app/.gnupg:ro
# Audit logging for sensitive operations
- id: audited_plugin_operation
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: log_operation_start
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
INSERT INTO audit_log
(operation, user_id, start_time, parameters, execution_id)
VALUES (
'pii_data_extract',
'{{ execution.user }}',
NOW(),
'{{ inputs | tojson }}',
'{{ execution.id }}'
)
- id: perform_sensitive_operation
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
SELECT
user_id,
email,
phone_number,
masked_ssn
FROM pii_data
WHERE department = '{{ inputs.department }}'
fetch: true
- id: log_operation_complete
type: io.kestra.plugin.jdbc.postgresql.Query
sql: |
UPDATE audit_log
SET
end_time = NOW(),
row_count = {{ outputs.perform_sensitive_operation.rowCount }},
status = 'SUCCESS'
WHERE execution_id = '{{ execution.id }}'
Plugin Testing Framework
# flows/plugin-testing-framework.yml
id: plugin-testing-pipeline
namespace: quality.assurance
inputs:
- name: plugin_name
type: STRING
required: true
- name: test_scenarios
type: JSON
required: true
tasks:
- id: setup_test_environment
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: start_test_database
type: io.kestra.plugin.docker.Run
image: postgres:15
env:
POSTGRES_PASSWORD: test
POSTGRES_DB: test_db
ports:
- 5432:5432
wait: true
maxDuration: PT1M
- id: start_test_api
type: io.kestra.plugin.docker.Run
image: mockserver/mockserver
command: -serverPort 1080
ports:
- 1080:1080
wait: true
maxDuration: PT1M
- id: run_plugin_tests
type: io.kestra.plugin.core.flow.EachSequential
value: "{{ inputs.test_scenarios }}"
tasks:
- id: "execute_test_{{ taskloop.index }}"
type: io.kestra.plugin.core.flow.Switch
value: "{{ task.value.type }}"
cases:
database:
- id: run_database_test
type: io.kestra.plugin.jdbc.postgresql.Query
url: jdbc:postgresql://localhost:5432/test_db
username: postgres
password: test
sql: "{{ task.value.query }}"
fetch: true
api:
- id: setup_api_mock
type: io.kestra.plugin.core.http.Request
uri: http://localhost:1080/mockserver/expectation
method: PUT
body: "{{ task.value.mock_config | tojson }}"
- id: test_api_call
type: io.kestra.plugin.core.http.Request
uri: "http://localhost:1080{{ task.value.endpoint }}"
method: "{{ task.value.method }}"
body: "{{ task.value.request_body | tojson }}"
performance:
- id: run_performance_test
type: io.kestra.plugin.core.flow.EachSequential
value: "{{ range(1, task.value.iterations + 1) }}"
maxParallel: 10
tasks:
- id: "iteration_{{ taskloop.value }}"
type: io.kestra.plugin.core.debug.Return
format: "{{ task.value }}"
- id: collect_test_results
type: io.kestra.plugin.scripts.python.Script
script: |
import json
import statistics
# Collect results from all tests
test_results = []
{% for i in range(1, inputs.test_scenarios|length + 1) %}
test_output = {{ outputs.run_plugin_tests.outputs['execute_test_' + i|string] | default({}) | tojson }}
if test_output:
test_results.append({
'scenario': {{ i }},
'success': test_output.get('state') == 'SUCCESS',
'duration': test_output.get('duration', 0)
})
{% endfor %}
# Calculate statistics
summary = {
'total_tests': len(test_results),
'passed_tests': sum(1 for r in test_results if r['success']),
'failed_tests': sum(1 for r in test_results if not r['success']),
'pass_rate': sum(1 for r in test_results if r['success']) / len(test_results) * 100 if test_results else 0,
'avg_duration': statistics.mean(r['duration'] for r in test_results) if test_results else 0,
'results': test_results
}
print(json.dumps(summary, indent=2))
- id: generate_test_report
type: io.kestra.plugin.notifications.email.EmailSend
to: "qa-team@company.com"
subject: "Plugin Test Results - {{ inputs.plugin_name }}"
htmlBody: |
<h1>Plugin Test Report</h1>
<p><strong>Plugin:</strong> {{ inputs.plugin_name }}</p>
<p><strong>Pass Rate:</strong> {{ outputs.collect_test_results.vars.pass_rate | round(2) }}%</p>
<p><strong>Total Tests:</strong> {{ outputs.collect_test_results.vars.total_tests }}</p>
<p><strong>Execution:</strong> {{ execution.id }}</p>
Part 6: Contributing to the Plugin Ecosystem
Open Source Contribution Guide
# 1. Fork the repository
https://github.com/kestra-io/kestra
# 2. Clone your fork
git clone https://github.com/YOUR_USERNAME/kestra.git
cd kestra
# 3. Create a feature branch
git checkout -b feature/new-plugin
# 4. Create plugin structure
mkdir -p plugin-<name>/src/main/java/io/kestra/plugin/<category>
mkdir -p plugin-<name>/src/test/java/io/kestra/plugin/<category>
# 5. Implement your plugin
# (Follow existing plugin patterns)
# 6. Write tests
# (Aim for >80% test coverage)
# 7. Update documentation
# - README.md in plugin directory
# - Add examples in @Plugin annotation
# - Update plugin list in docs
# 8. Run tests
mvn test -pl plugin-<name>
# 9. Submit pull request
git push origin feature/new-plugin
# Create PR on GitHub
Plugin Contribution Checklist
## New Plugin Contribution Checklist
### ✅ Code Quality
- [ ] Follows Kestra coding conventions
- [ ] Proper error handling and logging
- [ ] Input validation with @NotNull annotations
- [ ] Comprehensive JavaDoc comments
- [ ] No SonarQube issues
### ✅ Testing
- [ ] Unit tests for all public methods
- [ ] Integration tests with real systems
- [ ] Test edge cases and error conditions
- [ ] Mock external dependencies
- [ ] Test coverage > 80%
### ✅ Documentation
- [ ] README.md with usage examples
- [ ] @Plugin annotation with examples
- [ ] Swagger annotations for all properties
- [ ] Clear property descriptions
- [ ] Example flows in /examples directory
### ✅ Configuration
- [ ] Proper Maven pom.xml
- [ ] Plugin registration in parent pom
- [ ] Version aligned with Kestra core
- [ ] No conflicting dependencies
### ✅ Security
- [ ] Secrets marked with secret = true
- [ ] No hardcoded credentials
- [ ] Input sanitization
- [ ] SSL/TLS support where applicable
### ✅ Performance
- [ ] Connection pooling for databases
- [ ] Batch operations for bulk data
- [ ] Memory-efficient streaming
- [ ] Timeout configurations
Community Plugin Gallery
# Example of a well-documented community plugin
id: community-plugin-showcase
namespace: demo.community
tasks:
# Example: dbt plugin from community
- id: run_dbt_models
type: io.kestra.plugin.dbt.cli.Run
projectDir: "s3://my-dbt-project/"
profilesDir: "s3://my-dbt-profiles/"
target: prod
select:
- tag:hourly
- model_name+
exclude:
- tag:disabled
# Example: Great Expectations plugin
- id: data_quality_check
type: io.kestra.plugin.greatexpectations.Validation
expectationSuite: |
{
"expectation_suite_name": "customer_data",
"expectations": [
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {"column": "customer_id"}
}
]
}
data:
type: file
path: "{{ inputs.data_file }}"
# Example: Airbyte connector
- id: sync_with_airbyte
type: io.kestra.plugin.airbyte.Sync
connectionId: "{{ secret('AIRBYTE_CONNECTION_ID') }}"
workspaceId: "{{ secret('AIRBYTE_WORKSPACE_ID') }}"
apiUrl: "https://api.airbyte.com"
wait: true
# Example: Dagster integration
- id: trigger_dagster_pipeline
type: io.kestra.plugin.dagster.Run
repository: my_repo
pipeline: process_data
mode: default
runConfig:
resources:
io_manager:
config:
s3_bucket: "dagster-output"
Conclusion: Becoming a Plugin Master
Key Takeaways
Kestra's plugin ecosystem is vast: 150+ plugins covering every data engineering need
Custom plugins are straightforward: Java-based, with clear interfaces and patterns
Enterprise integration is possible: Create plugins for any system, no matter how legacy
Performance matters: Optimize plugins for production workloads
Community contribution is valuable: Share your plugins to help others
Next Steps for You
Audit your data stack: What systems need plugins?
Start simple: Create one custom plugin for an internal API
Contribute: Find a missing plugin in the ecosystem and build it
Share: Write a blog post about your plugin journey
Optimize: Review existing plugins for performance improvements
Resources
Official Documentation: https://kestra.io/docs/plugins/develop
Plugin Examples: https://github.com/kestra-io/examples
Community Forum: https://github.com/kestra-io/kestra/discussions
Plugin Template: https://github.com/kestra-io/plugin-template
In the next article, we'll dive into Monitoring and Observability in Kestra - how to ensure your workflows are running smoothly and troubleshoot when they're not.
Before the next article, try:
Create a simple custom plugin for a system you use
Optimize an existing plugin configuration for better performance
Contribute documentation or examples to an existing plugin
Set up a plugin testing pipeline for your team
Share your plugin creation experience in the Kestra community
Remember: Every great data platform starts with great plugins. What will you build?
Happy plugin developing! 🛠️


