Skip to main content

Command Palette

Search for a command to run...

Article 6: Mastering Kestra's Plugin Ecosystem.

Updated
M

Learner, Love to make things simple, Full Stack Developer, StackOverflower, Passionate about using machine learning, deep learning and AI

Extending Kestra to Every Corner of Your Data Stack.

Introduction: The Power of Plugins

Imagine you're a master chef. You don't just have one knife - you have specialized tools for every task: a paring knife for delicate work, a chef's knife for chopping, a boning knife for meat, and a bread knife for, well, bread. That's what Kestra's plugin ecosystem gives you: specialized tools for every data engineering task.

In this article, you'll learn to:

  • Navigate Kestra's vast plugin library (150+ plugins and counting)

  • Create your own custom plugins for proprietary systems

  • Build plugin-based architectures for enterprise integration

  • Debug and optimize plugin performance

  • Contribute to the open-source plugin ecosystem

Quick Stats:

  • 150+ official plugins available

  • 1,000+ companies using Kestra plugins

  • 10,000+ plugin downloads per week

  • 0 lines of code needed for most integrations


Part 1: The Plugin Architecture

Understanding Plugin Layers

Kestra's plugin architecture is like a LEGO system:

Application Layer (Your Flows)
    ↓
Plugin Interface (Java Interfaces)
    ↓
Plugin Implementation (Built-in/Custom)
    ↓
Target System (Databases, APIs, Cloud Services)

Core Plugin Types

# Every plugin belongs to one of these categories
plugins:
  - Input/Output:
    - Filesystems: S3, GCS, Azure Blob, Local, FTP
    - Databases: PostgreSQL, MySQL, Snowflake, BigQuery
    - APIs: REST, GraphQL, SOAP

  - Processing:
    - Scripting: Python, R, Node.js, Bash
    - Data Processing: Spark, Flink, Pandas
    - ML/AI: TensorFlow, PyTorch, Hugging Face

  - Messaging:
    - Queues: Kafka, RabbitMQ, AWS SQS
    - Streaming: Kafka Streams, Apache Pulsar

  - Cloud Services:
    - AWS: S3, Lambda, Glue, Redshift
    - GCP: BigQuery, Cloud Functions, Dataflow
    - Azure: Blob Storage, Data Factory, Synapse

  - Monitoring:
    - Metrics: Prometheus, Datadog, New Relic
    - Logging: ELK, Splunk, CloudWatch
    - Alerting: Slack, PagerDuty, Email

  - Custom:
    - Enterprise Systems: SAP, Salesforce, Oracle
    - Internal APIs: Your company's systems
    - Legacy Systems: Mainframes, AS/400

Plugin Anatomy: What Makes a Good Plugin?

// The anatomy of a Kestra plugin (simplified)
public class MyPlugin extends Task implements FlowableTask {

    // 1. Configuration (YAML properties)
    @PluginProperty private String connectionString;
    @PluginProperty private Integer timeout;

    // 2. Execution logic
    @Override
    public List<TaskRun> run(RunContext runContext) throws Exception {
        // Your plugin logic here
        return List.of(TaskRun.of(this, Outputs.of("result", "success")));
    }

    // 3. Input/Output handling
    public interface Output {
        String result();
    }
}

Part 2: Using Built-in Plugins

Plugin Discovery and Installation

# List available plugins
kestra plugins list

# Search for specific plugins
kestra plugins search --query "postgres"

# Install a plugin
kestra plugins install kestra-plugin-jdbc-postgresql

# View installed plugins
kestra plugins installed

# Update all plugins
kestra plugins update-all

Database Plugins Deep Dive

# flows/database-demo.yml
id: database-plugin-showcase
namespace: demo.plugins
description: Showcase of database plugins

tasks:
  # PostgreSQL - Full-featured SQL database
  - id: postgres_example
    type: io.kestra.plugin.jdbc.postgresql.Query
    url: jdbc:postgresql://localhost:5432/mydb
    username: "{{ secret('PG_USER') }}"
    password: "{{ secret('PG_PASS') }}"
    sql: |
      -- Parameterized query
      SELECT * FROM users 
      WHERE created_at > '{{ execution.startDate | dateAdd(-7, 'DAYS') }}'
        AND status = '{{ inputs.user_status }}'
    fetch: true
    fetchSize: 1000
    store: true

  # Snowflake - Cloud data warehouse  
  - id: snowflake_example
    type: io.kestra.plugin.jdbc.snowflake.Query
    url: jdbc:snowflake://account.snowflakecomputing.com
    username: "{{ secret('SNOWFLAKE_USER') }}"
    password: "{{ secret('SNOWFLAKE_PASS') }}"
    role: SYSADMIN
    warehouse: COMPUTE_WH
    database: PRODUCTION
    schema: ANALYTICS
    sql: |
      COPY INTO @my_stage/users.parquet
      FROM (
        SELECT * FROM users 
        WHERE DAY = '{{ execution.startDate | date('yyyy-MM-dd') }}'
      )
      FILE_FORMAT = (TYPE = PARQUET)
    store: true

  # BigQuery - Serverless data warehouse
  - id: bigquery_example
    type: io.kestra.plugin.gcp.bigquery.Query
    sql: |
      SELECT 
        user_id,
        COUNT(*) as session_count,
        SUM(duration) as total_duration
      FROM `project.dataset.sessions`
      WHERE DATE(timestamp) = '{{ execution.startDate | date('yyyy-MM-dd') }}'
      GROUP BY user_id
    destinationTable:
      projectId: my-project
      datasetId: results
      tableId: daily_sessions
    writeDisposition: WRITE_TRUNCATE
    createDisposition: CREATE_IF_NEEDED

  # ClickHouse - OLAP database
  - id: clickhouse_example
    type: io.kestra.plugin.jdbc.clickhouse.Query
    url: jdbc:clickhouse://localhost:8123/analytics
    sql: |
      SELECT 
        toStartOfDay(timestamp) as day,
        uniq(user_id) as daily_active_users,
        count(*) as events
      FROM events
      WHERE timestamp >= '{{ execution.startDate | dateAdd(-30, 'DAYS') }}'
      GROUP BY day
      ORDER BY day
    fetch: true

  # Multi-database Join (Virtual Data Warehouse)
  - id: cross_database_join
    type: io.kestra.plugin.scripts.python.Script
    description: "Join data from multiple databases"
    inputFiles:
      postgres_data.json: "{{ outputs.postgres_example.uri }}"
      snowflake_data.json: "{{ outputs.snowflake_example.uri }}"
      bigquery_data.json: "{{ outputs.bigquery_example.uri }}"
    script: |
      import pandas as pd
      import json

      # Load from different sources
      pg_df = pd.read_json('postgres_data.json')
      sf_df = pd.read_json('snowflake_data.json')
      bq_df = pd.read_json('bigquery_data.json')

      # Perform cross-database join
      merged = pd.merge(
          pg_df, 
          sf_df, 
          on='user_id',
          how='left'
      ).merge(
          bq_df,
          on='user_id',
          how='left'
      )

      # Save result
      merged.to_parquet('cross_database_result.parquet', index=False)
      print(f"Merged {len(merged)} records from 3 databases")

Cloud Storage Plugins

# flows/cloud-storage.yml
id: cloud-storage-operations
namespace: demo.plugins

tasks:
  # AWS S3 Operations
  - id: s3_operations
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: upload_to_s3
        type: io.kestra.plugin.aws.s3.Upload
        from: "data/output.parquet"
        bucket: my-data-lake
        key: "raw/{{ execution.startDate | date('yyyy/MM/dd') }}/data.parquet"
        region: us-east-1
        accessKeyId: "{{ secret('AWS_ACCESS_KEY') }}"
        secretKeyId: "{{ secret('AWS_SECRET_KEY') }}"

      - id: list_s3_files
        type: io.kestra.plugin.aws.s3.List
        bucket: my-data-lake
        region: us-east-1
        prefix: "raw/{{ execution.startDate | date('yyyy/MM/dd') }}/"

      - id: sync_s3_to_local
        type: io.kestra.plugin.aws.s3.Download
        bucket: my-data-lake
        key: "{{ outputs.list_s3_files.uris[0] }}"
        region: us-east-1

  # Google Cloud Storage
  - id: gcs_operations
    type: io.kestra.plugin.gcp.gcs.Upload
    from: "data/processed.parquet"
    bucket: my-gcp-bucket
    name: "processed/{{ execution.startDate | date('yyyy-MM-dd') }}.parquet"
    projectId: my-gcp-project

  # Azure Blob Storage
  - id: azure_operations
    type: io.kestra.plugin.azure.blob.Upload
    from: "data/archive.parquet"
    connectionString: "{{ secret('AZURE_CONNECTION_STRING') }}"
    container: my-container
    name: "archive/{{ execution.startDate | date('yyyy/MM') }}/data.parquet"

  # Multi-cloud synchronization
  - id: multi_cloud_sync
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: sync_to_aws
        type: io.kestra.plugin.aws.s3.Copy
        from: "{{ outputs.gcs_operations.uri }}"
        to: "s3://my-data-lake/gcs-mirror/{{ execution.startDate | date('yyyy-MM-dd') }}.parquet"

      - id: sync_to_azure
        type: io.kestra.plugin.azure.blob.Copy
        from: "{{ outputs.gcs_operations.uri }}"
        to: "https://myaccount.blob.core.windows.net/mycontainer/gcs-mirror/data.parquet"

Messaging and Streaming Plugins

# flows/messaging-demo.yml
id: messaging-patterns
namespace: demo.plugins

tasks:
  # Kafka Producer
  - id: produce_to_kafka
    type: io.kestra.plugin.kafka.Produce
    topic: user-events
    key: "{{ inputs.user_id }}"
    value: |
      {
        "event_type": "{{ inputs.event_type }}",
        "user_id": "{{ inputs.user_id }}",
        "timestamp": "{{ now() }}",
        "properties": {{ inputs.properties | tojson }}
      }
    properties:
      bootstrap.servers: "kafka-broker:9092"
      acks: "all"
      compression.type: "snappy"
    serializer: STRING

  # Kafka Consumer (Stream Processing)
  - id: consume_from_kafka
    type: io.kestra.plugin.kafka.Consume
    topic: user-events
    groupId: "etl-processor-{{ execution.startDate | date('yyyy-MM-dd') }}"
    maxRecords: 1000
    timeout: PT1M
    properties:
      bootstrap.servers: "kafka-broker:9092"
      auto.offset.reset: "earliest"

  # RabbitMQ
  - id: rabbitmq_example
    type: io.kestra.plugin.amqp.Write
    uri: "amqp://user:pass@rabbitmq:5672"
    exchange: events
    routingKey: "user.{{ inputs.user_type }}"
    content: |
      {{ inputs.event_data | tojson }}
    contentType: "application/json"

  # AWS SQS
  - id: sqs_example
    type: io.kestra.plugin.aws.sqs.Send
    queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
    messageBody: |
      {{ inputs.message | tojson }}
    region: us-east-1

  # Real-time Data Pipeline
  - id: real_time_pipeline
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: consume_stream
        type: io.kestra.plugin.kafka.Consume
        topic: clickstream
        groupId: real-time-processor
        maxRecords: 100
        timeout: PT10S

      - id: process_in_realtime
        type: io.kestra.plugin.scripts.python.Script
        inputFiles:
          messages.json: "{{ outputs.consume_stream.uri }}"
        script: |
          import json
          import pandas as pd
          from datetime import datetime

          with open('messages.json', 'r') as f:
              messages = json.load(f)

          # Real-time processing
          events = []
          for msg in messages:
              event = json.loads(msg['value'])

              # Enrich in real-time
              event['processed_at'] = datetime.now().isoformat()
              event['hour_of_day'] = datetime.fromisoformat(event['timestamp']).hour
              event['is_peak'] = 9 <= event['hour_of_day'] <= 17

              events.append(event)

          # Batch write for efficiency
          if events:
              df = pd.DataFrame(events)
              df.to_parquet(f'processed_{datetime.now().strftime("%H%M%S")}.parquet')
              print(f"Processed {len(events)} events in real-time")

      - id: produce_results
        type: io.kestra.plugin.kafka.Produce
        topic: processed-events
        key: "{{ execution.startDate | date('yyyy-MM-dd HH') }}"
        value: "{{ outputs.process_in_realtime.outputFiles | tojson }}"
        conditions:
          - type: io.kestra.plugin.core.condition.Expression
            expression: "{{ outputs.process_in_realtime.state.current == 'SUCCESS' }}"

Scripting and Data Processing Plugins

# flows/scripting-demo.yml
id: multi-language-scripts
namespace: demo.plugins

tasks:
  # Python with dependencies
  - id: python_with_deps
    type: io.kestra.plugin.scripts.python.Script
    description: "Python script with custom dependencies"
    script: |
      import pandas as pd
      import numpy as np
      from scipy import stats
      import requests

      # Your data science code here
      data = pd.DataFrame({
          'x': np.random.normal(0, 1, 1000),
          'y': np.random.normal(0, 1, 1000)
      })

      correlation = data['x'].corr(data['y'])
      print(f"Correlation: {correlation:.3f}")

      # Save results
      data.to_parquet('analysis_result.parquet', index=False)
    docker:
      image: "kestra/kestra-python:latest"
      volumes:
        - /tmp:/tmp
    inputFiles:
      config.yaml: |
        dataset:
          name: "sample_data"
          version: "1.0"
    outputFiles:
      - "*.parquet"
      - "*.csv"

  # R for statistical analysis
  - id: r_analysis
    type: io.kestra.plugin.scripts.r.Script
    script: |
      library(dplyr)
      library(ggplot2)
      library(jsonlite)

      # Load data
      data <- read.csv("input_data.csv")

      # Statistical analysis
      result <- data %>%
        group_by(category) %>%
        summarise(
          mean = mean(value),
          sd = sd(value),
          n = n()
        )

      # Create visualization
      plot <- ggplot(data, aes(x=category, y=value)) +
        geom_boxplot() +
        theme_minimal()

      ggsave("boxplot.png", plot, width=8, height=6)

      # Output results
      write_json(result, "analysis_result.json")
      cat("Analysis complete\n")
    docker:
      image: rocker/tidyverse:latest

  # Node.js for API interactions
  - id: nodejs_api_client
    type: io.kestra.plugin.scripts.node.Script
    script: |
      const axios = require('axios');
      const fs = require('fs').promises;

      async function fetchData() {
          const response = await axios.get('https://api.example.com/data');
          const data = response.data;

          // Transform data
          const transformed = data.map(item => ({
              id: item.id,
              name: item.name.toUpperCase(),
              timestamp: new Date().toISOString()
          }));

          // Save to file
          await fs.writeFile(
              'api_data.json',
              JSON.stringify(transformed, null, 2)
          );

          console.log(`Fetched ${transformed.length} records`);
      }

      fetchData().catch(console.error);
    docker:
      image: node:18-alpine

  # Bash for system operations
  - id: bash_operations
    type: io.kestra.plugin.scripts.shell.Commands
    commands:
      - find /data -name "*.parquet" -type f -mtime -7 | wc -l
      - du -sh /data/* | sort -hr
      - echo "Processing started at $(date)"
      - tar -czf archive_$(date +%Y%m%d_%H%M%S).tar.gz /data/input
    exitOnFailed: false

  # SQL-based transformation
  - id: sql_transformation
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      -- Load data from multiple formats
      CREATE TABLE users AS 
      SELECT * FROM read_parquet('users.parquet');

      CREATE TABLE orders AS
      SELECT * FROM read_csv('orders.csv');

      -- Complex transformation
      WITH user_stats AS (
        SELECT 
          u.user_id,
          u.join_date,
          COUNT(o.order_id) as order_count,
          SUM(o.amount) as total_spent,
          AVG(o.amount) as avg_order_value
        FROM users u
        LEFT JOIN orders o ON u.user_id = o.user_id
        GROUP BY u.user_id, u.join_date
      )
      SELECT * FROM user_stats
      ORDER BY total_spent DESC;
    fetch: true
    store: true

Machine Learning Plugins

# flows/ml-pipeline.yml
id: ml-training-pipeline
namespace: demo.ml

tasks:
  # Data Preparation
  - id: prepare_training_data
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import pandas as pd
      from sklearn.model_selection import train_test_split
      import joblib

      # Load and prepare data
      df = pd.read_parquet('raw_data.parquet')

      # Feature engineering
      df['feature1'] = df['value1'] * df['value2']
      df['feature2'] = df['value3'].apply(lambda x: 1 if x > 0 else 0)

      # Split data
      X = df[['feature1', 'feature2', 'value4']]
      y = df['target']

      X_train, X_test, y_train, y_test = train_test_split(
          X, y, test_size=0.2, random_state=42
      )

      # Save prepared data
      joblib.dump({
          'X_train': X_train,
          'X_test': X_test,
          'y_train': y_train,
          'y_test': y_test,
          'feature_names': X.columns.tolist()
      }, 'prepared_data.pkl')

      print(f"Prepared data: {len(X_train)} training, {len(X_test)} test samples")

  # Model Training with TensorFlow
  - id: train_tensorflow_model
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      data.pkl: "{{ outputs.prepare_training_data.outputFiles['prepared_data.pkl'] }}"
    script: |
      import tensorflow as tf
      from tensorflow import keras
      import joblib
      import numpy as np

      # Load prepared data
      data = joblib.load('data.pkl')
      X_train, X_test, y_train, y_test = (
          data['X_train'], data['X_test'], 
          data['y_train'], data['y_test']
      )

      # Build model
      model = keras.Sequential([
          keras.layers.Dense(64, activation='relu', input_shape=(X_train.shape[1],)),
          keras.layers.Dropout(0.2),
          keras.layers.Dense(32, activation='relu'),
          keras.layers.Dense(1, activation='sigmoid')
      ])

      model.compile(
          optimizer='adam',
          loss='binary_crossentropy',
          metrics=['accuracy']
      )

      # Train model
      history = model.fit(
          X_train, y_train,
          validation_split=0.2,
          epochs=10,
          batch_size=32,
          verbose=1
      )

      # Evaluate
      test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
      print(f"Test accuracy: {test_acc:.4f}")

      # Save model
      model.save('trained_model.h5')

      # Save metrics
      metrics = {
          'test_accuracy': float(test_acc),
          'test_loss': float(test_loss),
          'training_history': history.history
      }

      import json
      with open('model_metrics.json', 'w') as f:
          json.dump(metrics, f)

    docker:
      image: tensorflow/tensorflow:2.13.0
      gpu: true  # Enable GPU if available

  # Model Serving with Triton
  - id: deploy_model_triton
    type: io.kestra.plugin.triton.DeployModel
    modelRepository: "/models"
    modelName: "customer-churn"
    modelVersion: "1"
    modelFile: "{{ outputs.train_tensorflow_model.outputFiles['trained_model.h5'] }}"
    config: |
      platform: "tensorflow_savedmodel"
      max_batch_size: 64
      input [
        {
          name: "input"
          data_type: TYPE_FP32
          dims: [ -1, 3 ]
        }
      ]
      output [
        {
          name: "output"
          data_type: TYPE_FP32
          dims: [ -1, 1 ]
        }
      ]

  # ML Monitoring with Evidently
  - id: monitor_model_drift
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import pandas as pd
      import numpy as np
      from evidently.report import Report
      from evidently.metric_preset import DataDriftPreset
      import joblib

      # Load reference data (training data)
      ref_data = pd.read_parquet('reference_data.parquet')

      # Load current data (production data)
      curr_data = pd.read_parquet('current_data.parquet')

      # Generate drift report
      drift_report = Report(metrics=[DataDriftPreset()])
      drift_report.run(
          reference_data=ref_data,
          current_data=curr_data
      )

      # Save report
      drift_report.save_html('data_drift_report.html')

      # Extract metrics
      drift_metrics = drift_report.as_dict()
      drift_score = drift_metrics['metrics'][0]['result']['dataset_drift']

      print(f"Data drift detected: {drift_score}")

      # Alert if drift exceeds threshold
      if drift_score:
          print("ALERT: Significant data drift detected!")

Part 3: Creating Custom Plugins

Plugin Development Environment Setup

# Create plugin project
mvn archetype:generate \
  -DarchetypeGroupId=io.kestra \
  -DarchetypeArtifactId=kestra-plugin-archetype \
  -DarchetypeVersion=0.15.0 \
  -DgroupId=com.company.kestra \
  -DartifactId=kestra-plugin-custom \
  -Dversion=1.0.0 \
  -Dpackage=com.company.kestra.plugin.custom

# Build plugin
cd kestra-plugin-custom
mvn clean package

# Install locally
mvn install

# Test plugin
mvn test

# Deploy to Kestra
cp target/kestra-plugin-custom-1.0.0.jar /path/to/kestra/plugins/

Simple Custom Plugin Example

// src/main/java/com/company/kestra/plugin/custom/HelloWorld.java
package com.company.kestra.plugin.custom;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotNull;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
    examples = {
        @Example(
            title = "Say hello to someone",
            code = {
                "id: hello",
                "type: com.company.kestra.plugin.custom.HelloWorld",
                "name: John"
            }
        )
    }
)
@Schema(
    title = "Hello World Task",
    description = "A simple custom plugin that greets someone"
)
public class HelloWorld extends Task implements RunnableTask<HelloWorld.Output> {

    @PluginProperty
    @Schema(title = "Name to greet")
    @NotNull
    private String name;

    @PluginProperty
    @Schema(title = "Greeting message")
    private String message;

    @Override
    public Output run(RunContext runContext) throws Exception {
        String greeting = message != null ? message : "Hello";
        String fullMessage = greeting + ", " + name + "!";

        // Log the greeting
        runContext.logger().info(fullMessage);

        // You can also render variables
        String renderedName = runContext.render(name);

        return Output.builder()
            .greeting(fullMessage)
            .renderedName(renderedName)
            .executionTime(System.currentTimeMillis())
            .build();
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(title = "The greeting message")
        private final String greeting;

        @Schema(title = "Rendered name")
        private final String renderedName;

        @Schema(title = "Execution time in milliseconds")
        private final Long executionTime;
    }
}

Database Query Plugin Example

// src/main/java/com/company/kestra/plugin/custom/CustomDatabaseQuery.java
package com.company.kestra.plugin.custom;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotNull;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
    examples = {
        @Example(
            title = "Query custom database",
            code = {
                "id: custom_query",
                "type: com.company.kestra.plugin.custom.CustomDatabaseQuery",
                "connectionString: jdbc:custom://host:port/db",
                "username: admin",
                "password: \"{{ secret('CUSTOM_DB_PASS') }}\"",
                "sql: SELECT * FROM users WHERE active = true"
            }
        )
    }
)
@Schema(title = "Custom Database Query", description = "Query a custom database system")
public class CustomDatabaseQuery extends Task implements RunnableTask<CustomDatabaseQuery.Output> {

    @NotNull
    @PluginProperty(dynamic = true)
    @Schema(title = "JDBC connection string")
    private String connectionString;

    @PluginProperty(dynamic = true)
    @Schema(title = "Database username")
    private String username;

    @PluginProperty(dynamic = true)
    @Schema(title = "Database password", secret = true)
    private String password;

    @NotNull
    @PluginProperty(dynamic = true)
    @Schema(title = "SQL query to execute")
    private String sql;

    @PluginProperty
    @Schema(title = "Fetch results (true) or execute without results (false)")
    @Builder.Default
    private Boolean fetch = true;

    @PluginProperty
    @Schema(title = "Fetch size for large results")
    @Builder.Default
    private Integer fetchSize = 1000;

    @PluginProperty
    @Schema(title = "Query timeout in seconds")
    @Builder.Default
    private Integer queryTimeout = 30;

    @Override
    public Output run(RunContext runContext) throws Exception {
        // Render dynamic properties
        String renderedConnectionString = runContext.render(connectionString);
        String renderedUsername = runContext.render(username);
        String renderedPassword = runContext.render(password);
        String renderedSql = runContext.render(sql);

        Connection connection = null;
        PreparedStatement statement = null;
        ResultSet resultSet = null;

        try {
            // Load custom JDBC driver
            Class.forName("com.custom.Driver");

            // Create connection
            connection = DriverManager.getConnection(
                renderedConnectionString,
                renderedUsername,
                renderedPassword
            );

            // Create statement
            statement = connection.prepareStatement(renderedSql);
            statement.setFetchSize(fetchSize);
            statement.setQueryTimeout(queryTimeout);

            if (fetch) {
                // Execute query and fetch results
                resultSet = statement.executeQuery();

                List<Map<String, Object>> rows = new ArrayList<>();
                ResultSetMetaData metaData = resultSet.getMetaData();
                int columnCount = metaData.getColumnCount();

                while (resultSet.next()) {
                    Map<String, Object> row = new HashMap<>();
                    for (int i = 1; i <= columnCount; i++) {
                        String columnName = metaData.getColumnName(i);
                        Object value = resultSet.getObject(i);
                        row.put(columnName, value);
                    }
                    rows.add(row);
                }

                return Output.builder()
                    .rowCount(rows.size())
                    .rows(rows)
                    .query(renderedSql)
                    .build();

            } else {
                // Execute update
                int affectedRows = statement.executeUpdate();

                return Output.builder()
                    .rowCount(affectedRows)
                    .query(renderedSql)
                    .build();
            }

        } finally {
            // Clean up resources
            if (resultSet != null) {
                try { resultSet.close(); } catch (SQLException ignored) {}
            }
            if (statement != null) {
                try { statement.close(); } catch (SQLException ignored) {}
            }
            if (connection != null) {
                try { connection.close(); } catch (SQLException ignored) {}
            }
        }
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(title = "Number of rows affected or returned")
        private final Integer rowCount;

        @Schema(title = "Query results (only for SELECT queries)")
        private final List<Map<String, Object>> rows;

        @Schema(title = "Executed SQL query")
        private final String query;
    }
}

REST API Plugin Example

// src/main/java/com/company/kestra/plugin/custom/CustomApiClient.java
package com.company.kestra.plugin.custom;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import okhttp3.*;

import javax.validation.constraints.NotNull;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
    examples = {
        @Example(
            title = "Call custom API",
            code = {
                "id: call_custom_api",
                "type: com.company.kestra.plugin.custom.CustomApiClient",
                "baseUrl: https://api.company.com/v1",
                "endpoint: /users",
                "method: GET",
                "headers:",
                "  Authorization: Bearer {{ secret('API_TOKEN') }}",
                "queryParams:",
                "  status: active",
                "  limit: 100"
            }
        )
    }
)
@Schema(title = "Custom API Client", description = "Interact with a custom REST API")
public class CustomApiClient extends Task implements RunnableTask<CustomApiClient.Output> {

    private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();
    private static final OkHttpClient HTTP_CLIENT = new OkHttpClient.Builder()
        .connectTimeout(Duration.ofSeconds(30))
        .readTimeout(Duration.ofSeconds(30))
        .writeTimeout(Duration.ofSeconds(30))
        .build();

    @NotNull
    @PluginProperty(dynamic = true)
    @Schema(title = "API base URL")
    private String baseUrl;

    @NotNull
    @PluginProperty(dynamic = true)
    @Schema(title = "API endpoint")
    private String endpoint;

    @PluginProperty
    @Schema(title = "HTTP method", defaultValue = "GET")
    @Builder.Default
    private Method method = Method.GET;

    @PluginProperty(dynamic = true)
    @Schema(title = "Request headers")
    private Map<String, String> headers;

    @PluginProperty(dynamic = true)
    @Schema(title = "Query parameters")
    private Map<String, String> queryParams;

    @PluginProperty(dynamic = true)
    @Schema(title = "Request body")
    private Object body;

    @PluginProperty
    @Schema(title = "Timeout in seconds")
    @Builder.Default
    private Integer timeout = 30;

    @PluginProperty
    @Schema(title = "Store response as file")
    @Builder.Default
    private Boolean store = false;

    public enum Method {
        GET, POST, PUT, DELETE, PATCH
    }

    @Override
    public Output run(RunContext runContext) throws Exception {
        // Render dynamic properties
        String renderedBaseUrl = runContext.render(baseUrl);
        String renderedEndpoint = runContext.render(endpoint);
        Map<String, String> renderedHeaders = renderMap(runContext, headers);
        Map<String, String> renderedQueryParams = renderMap(runContext, queryParams);
        Object renderedBody = renderBody(runContext, body);

        // Build URL with query parameters
        HttpUrl.Builder urlBuilder = HttpUrl.parse(renderedBaseUrl + renderedEndpoint).newBuilder();
        if (renderedQueryParams != null) {
            renderedQueryParams.forEach(urlBuilder::addQueryParameter);
        }

        // Build request
        Request.Builder requestBuilder = new Request.Builder()
            .url(urlBuilder.build());

        // Add headers
        if (renderedHeaders != null) {
            renderedHeaders.forEach(requestBuilder::addHeader);
        }

        // Add body for methods that support it
        if (renderedBody != null && 
            (method == Method.POST || method == Method.PUT || 
             method == Method.PATCH || method == Method.DELETE)) {

            String bodyString;
            if (renderedBody instanceof String) {
                bodyString = (String) renderedBody;
                requestBuilder.addHeader("Content-Type", "text/plain");
            } else {
                bodyString = OBJECT_MAPPER.writeValueAsString(renderedBody);
                requestBuilder.addHeader("Content-Type", "application/json");
            }

            RequestBody requestBody = RequestBody.create(
                bodyString, 
                MediaType.parse(requestBuilder.build().header("Content-Type"))
            );

            requestBuilder.method(method.name(), requestBody);
        } else {
            requestBuilder.method(method.name(), null);
        }

        // Execute request with custom timeout
        OkHttpClient client = HTTP_CLIENT.newBuilder()
            .connectTimeout(Duration.ofSeconds(timeout))
            .readTimeout(Duration.ofSeconds(timeout))
            .writeTimeout(Duration.ofSeconds(timeout))
            .build();

        try (Response response = client.newCall(requestBuilder.build()).execute()) {
            String responseBody = response.body() != null ? 
                response.body().string() : null;

            Map<String, Object> responseMap = new HashMap<>();
            if (responseBody != null && !responseBody.isEmpty()) {
                try {
                    responseMap = OBJECT_MAPPER.readValue(responseBody, Map.class);
                } catch (Exception e) {
                    responseMap.put("raw", responseBody);
                }
            }

            return Output.builder()
                .statusCode(response.code())
                .headers(response.headers().toMultimap())
                .body(responseMap)
                .success(response.isSuccessful())
                .message(response.message())
                .build();
        }
    }

    private Map<String, String> renderMap(RunContext runContext, Map<String, String> map) {
        if (map == null) return null;

        Map<String, String> rendered = new HashMap<>();
        map.forEach((key, value) -> {
            rendered.put(key, runContext.render(value));
        });
        return rendered;
    }

    private Object renderBody(RunContext runContext, Object body) {
        if (body == null) return null;

        if (body instanceof String) {
            return runContext.render((String) body);
        } else if (body instanceof Map) {
            // Deep render map values
            Map<?, ?> map = (Map<?, ?>) body;
            Map<Object, Object> rendered = new HashMap<>();
            map.forEach((key, value) -> {
                Object renderedKey = key instanceof String ? 
                    runContext.render((String) key) : key;
                Object renderedValue = value instanceof String ? 
                    runContext.render((String) value) : value;
                rendered.put(renderedKey, renderedValue);
            });
            return rendered;
        }
        return body;
    }

    @Builder
    @Getter
    public static class Output implements io.kestra.core.models.tasks.Output {
        @Schema(title = "HTTP status code")
        private final Integer statusCode;

        @Schema(title = "Response headers")
        private final Map<String, java.util.List<String>> headers;

        @Schema(title = "Response body")
        private final Map<String, Object> body;

        @Schema(title = "Whether the request was successful")
        private final Boolean success;

        @Schema(title = "Response message")
        private final String message;
    }
}

Plugin Configuration (pom.xml)

<!-- pom.xml for custom plugin -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.company.kestra</groupId>
    <artifactId>kestra-plugin-custom</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <name>Kestra Custom Plugin</name>
    <description>Custom plugins for Company's systems</description>

    <properties>
        <kestra.version>0.15.0</kestra.version>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Kestra Core -->
        <dependency>
            <groupId>io.kestra</groupId>
            <artifactId>core</artifactId>
            <version>${kestra.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- External Dependencies -->
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>4.11.0</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.2</version>
        </dependency>

        <!-- Test Dependencies -->
        <dependency>
            <groupId>io.kestra</groupId>
            <artifactId>junit5</artifactId>
            <version>${kestra.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.4.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Testing Custom Plugins

// src/test/java/com/company/kestra/plugin/custom/CustomApiClientTest.java
package com.company.kestra.plugin.custom;

import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

@MicronautTest
class CustomApiClientTest {

    @Inject
    private RunContextFactory runContextFactory;

    @Test
    void testApiCall() throws Exception {
        // Create task instance
        CustomApiClient task = CustomApiClient.builder()
            .id("test-api")
            .type(CustomApiClient.class.getName())
            .baseUrl("https://httpbin.org")
            .endpoint("/get")
            .method(CustomApiClient.Method.GET)
            .headers(Map.of(
                "User-Agent", "Kestra-Plugin-Test"
            ))
            .queryParams(Map.of(
                "test", "value"
            ))
            .build();

        // Execute task
        CustomApiClient.Output output = task.run(
            TestsUtils.mockRunContext(
                runContextFactory, 
                task, 
                new HashMap<>()
            )
        );

        // Verify results
        assertThat(output.getStatusCode(), is(200));
        assertThat(output.getSuccess(), is(true));
        assertThat(output.getBody(), notNullValue());

        // Verify response contains our query param
        Map<String, Object> args = (Map<String, Object>) 
            ((Map<String, Object>) output.getBody().get("args"));
        assertThat(args.get("test"), is("value"));
    }

    @Test
    void testPostRequest() throws Exception {
        Map<String, Object> requestBody = Map.of(
            "name", "John Doe",
            "email", "john@example.com",
            "active", true
        );

        CustomApiClient task = CustomApiClient.builder()
            .id("test-post")
            .type(CustomApiClient.class.getName())
            .baseUrl("https://httpbin.org")
            .endpoint("/post")
            .method(CustomApiClient.Method.POST)
            .body(requestBody)
            .build();

        CustomApiClient.Output output = task.run(
            TestsUtils.mockRunContext(
                runContextFactory, 
                task, 
                new HashMap<>()
            )
        );

        assertThat(output.getStatusCode(), is(200));

        Map<String, Object> json = (Map<String, Object>) 
            output.getBody().get("json");
        assertThat(json.get("name"), is("John Doe"));
        assertThat(json.get("active"), is(true));
    }
}

Plugin Documentation

# Custom Database Plugin

## Overview
This plugin allows querying custom database systems from Kestra workflows.

## Installation
1. Build the plugin:
   ```bash
   mvn clean package
  1. Copy the JAR to Kestra's plugins directory:

    bash

     cp target/kestra-plugin-custom-1.0.0.jar /opt/kestra/plugins/
    
  2. Restart Kestra.

Usage

Basic Query

id: query_custom_db
type: com.company.kestra.plugin.custom.CustomDatabaseQuery
connectionString: jdbc:custom://host:5432/database
username: admin
password: "{{ secret('DB_PASSWORD') }}"
sql: |
  SELECT * FROM users 
  WHERE created_at > '2024-01-01'
fetch: true

Update Query

id: update_records
type: com.company.kestra.plugin.custom.CustomDatabaseQuery
connectionString: jdbc:custom://host:5432/database
sql: |
  UPDATE users 
  SET status = 'inactive'
  WHERE last_login < '2024-01-01'
fetch: false

Properties

PropertyTypeRequiredDescription
connectionStringStringYesJDBC connection string
usernameStringNoDatabase username
passwordStringNoDatabase password
sqlStringYesSQL query to execute
fetchBooleanNoWhether to fetch results (default: true)
fetchSizeIntegerNoFetch size for large results (default: 1000)
queryTimeoutIntegerNoQuery timeout in seconds (default: 30)

Outputs

For SELECT queries (fetch: true):

{
  "rowCount": 100,
  "rows": [...],
  "query": "SELECT * FROM users"
}

For UPDATE/INSERT/DELETE queries (fetch: false):

{
  "rowCount": 50,
  "query": "UPDATE users SET status = 'active'"
}

Examples

See the examples directory for more usage scenarios.

---

## **Part 4: Plugin Management and Optimization**

### **Plugin Registry Configuration**

```yaml
# kestra.yml - Plugin configuration
kestra:
  plugins:
    # Plugin repositories
    repositories:
      - id: kestra-io
        url: https://repo.kestra.io/repository/maven-public/
        type: maven
        releases: true
        snapshots: true

      - id: company-private
        url: https://nexus.company.com/repository/maven-releases/
        type: maven
        releases: true
        snapshots: false
        authentication:
          username: "{{ secret('NEXUS_USER') }}"
          password: "{{ secret('NEXUS_PASS') }}"

    # Plugin locations
    locations:
      - /opt/kestra/plugins  # System plugins
      - /app/plugins         # User plugins

    # Plugin scanning
    scan:
      enabled: true
      interval: PT5M  # Scan every 5 minutes

    # Plugin isolation
    isolation:
      enabled: true
      mode: PROCESS  # PROCESS or THREAD
      process:
        maxThreads: 10
        maxMemory: 1Gi
        timeout: PT5M

    # Cache configuration
    cache:
      enabled: true
      size: 1000
      expireAfterWrite: PT1H

Plugin Version Management

# flows/plugin-management.yml
id: plugin-version-manager
namespace: system.plugins

tasks:
  - id: check_plugin_updates
    type: io.kestra.plugin.core.http.Request
    uri: "https://repo.kestra.io/repository/maven-public/io/kestra/plugin/maven-metadata.xml"
    store: true

  - id: parse_plugin_versions
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      metadata.xml: "{{ outputs.check_plugin_updates.uri }}"
    script: |
      import xml.etree.ElementTree as ET
      import json

      tree = ET.parse('metadata.xml')
      root = tree.getroot()

      versions = []
      for version in root.find('.//versioning/versions'):
          versions.append(version.text)

      latest = root.find('.//versioning/latest').text
      release = root.find('.//versioning/release').text

      result = {
          'total_versions': len(versions),
          'latest_version': latest,
          'release_version': release,
          'all_versions': versions[-10:]  # Last 10 versions
      }

      print(json.dumps(result, indent=2))

  - id: update_plugins_if_needed
    type: io.kestra.plugin.core.flow.EachSequential
    value: ["postgresql", "s3", "snowflake", "bigquery"]
    tasks:
      - id: "check_{{ task.value }}_version"
        type: io.kestra.plugin.core.debug.Return
        format: "{{ task.value }}: 1.2.3"  # Would be actual version check

      - id: "update_{{ task.value }}_if_outdated"
        type: io.kestra.plugin.core.flow.Subflow
        namespace: system.maintenance
        flowId: update-plugin
        wait: true
        transmit:
          - name: plugin_name
            value: "kestra-plugin-jdbc-{{ task.value }}"
          - name: target_version
            value: "latest"
        conditions:
          - type: io.kestra.plugin.core.condition.Expression
            expression: "{{ true }}"  # Actual version comparison

Plugin Performance Optimization

# flows/plugin-performance.yml
id: plugin-performance-optimization
namespace: demo.performance

tasks:
  # Batch operations instead of individual calls
  - id: batch_database_operations
    type: io.kestra.plugin.jdbc.postgresql.Query
    sql: |
      -- Batch insert
      INSERT INTO users (id, name, email) VALUES
      {% for user in inputs.users %}
      ('{{ user.id }}', '{{ user.name }}', '{{ user.email }}'){{ ',' if not loop.last }}
      {% endfor %}

      ON CONFLICT (id) DO UPDATE SET
        name = EXCLUDED.name,
        email = EXCLUDED.email;
    fetch: false

  # Connection pooling
  - id: connection_pool_config
    type: io.kestra.plugin.jdbc.postgresql.Query
    url: jdbc:postgresql://localhost:5432/db
    username: user
    password: pass
    sql: SELECT * FROM large_table
    connectionProperties:
      maximumPoolSize: "10"
      minimumIdle: "5"
      connectionTimeout: "30000"
      idleTimeout: "600000"
      maxLifetime: "1800000"

  # Parallel plugin execution
  - id: parallel_plugin_execution
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: api_call_1
        type: io.kestra.plugin.core.http.Request
        uri: "https://api.example.com/data1"

      - id: api_call_2
        type: io.kestra.plugin.core.http.Request
        uri: "https://api.example.com/data2"

      - id: api_call_3
        type: io.kestra.plugin.core.http.Request
        uri: "https://api.example.com/data3"

  # Plugin caching
  - id: cached_operation
    type: io.kestra.plugin.core.cache.Cached
    key: "expensive_operation:{{ inputs.user_id }}"
    ttl: PT1H
    tasks:
      - id: expensive_computation
        type: io.kestra.plugin.scripts.python.Script
        script: |
          import time
          import json

          # Simulate expensive computation
          time.sleep(5)

          result = {"computed": True, "value": 42}
          print(json.dumps(result))

  # Memory optimization for large data
  - id: process_large_data
    type: io.kestra.plugin.jdbc.postgresql.Query
    sql: SELECT * FROM very_large_table
    fetch: true
    fetchSize: 10000  # Process in chunks
    chunkSize: 1000   # Smaller chunks for memory efficiency

Plugin Monitoring and Metrics

# flows/plugin-monitoring.yml
id: plugin-monitoring-dashboard
namespace: system.monitoring

tasks:
  - id: collect_plugin_metrics
    type: io.kestra.plugin.jdbc.postgresql.Query
    fetch: true
    sql: |
      WITH plugin_stats AS (
        SELECT 
          SUBSTRING(task_type FROM 'plugin\.([^.]*)') as plugin_name,
          COUNT(*) as execution_count,
          AVG(EXTRACT(EPOCH FROM (end_date - start_date))) * 1000 as avg_duration_ms,
          PERCENTILE_CONT(0.95) WITHIN GROUP (
            ORDER BY EXTRACT(EPOCH FROM (end_date - start_date)) * 1000
          ) as p95_duration_ms,
          SUM(CASE WHEN state = 'FAILED' THEN 1 ELSE 0 END) as failure_count
        FROM executions
        WHERE start_date >= NOW() - INTERVAL '7 days'
          AND task_type LIKE 'io.kestra.plugin.%'
        GROUP BY plugin_name
      ),
      plugin_health AS (
        SELECT 
          plugin_name,
          execution_count,
          avg_duration_ms,
          p95_duration_ms,
          failure_count,
          CASE 
            WHEN failure_count = 0 THEN 'HEALTHY'
            WHEN failure_count::float / execution_count < 0.01 THEN 'WARNING'
            ELSE 'CRITICAL'
          END as health_status
        FROM plugin_stats
      )
      SELECT * FROM plugin_health
      ORDER BY execution_count DESC

  - id: generate_plugin_report
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      metrics.json: |
        {{ outputs.collect_plugin_metrics.rows | tojson }}
    script: |
      import json
      import pandas as pd
      from datetime import datetime

      data = json.loads('metrics.json')
      df = pd.DataFrame(data)

      # Create summary report
      report = {
          'generated_at': datetime.now().isoformat(),
          'total_plugins': len(df),
          'total_executions': df['execution_count'].sum(),
          'health_summary': {
              'healthy': len(df[df['health_status'] == 'HEALTHY']),
              'warning': len(df[df['health_status'] == 'WARNING']),
              'critical': len(df[df['health_status'] == 'CRITICAL'])
          },
          'top_plugins_by_usage': df.nlargest(10, 'execution_count').to_dict('records'),
          'slowest_plugins': df.nlargest(5, 'p95_duration_ms').to_dict('records'),
          'most_failing_plugins': df.nlargest(5, 'failure_count').to_dict('records')
      }

      print(json.dumps(report, indent=2))

  - id: alert_on_plugin_issues
    type: io.kestra.plugin.notifications.slack.SlackIncomingWebhook
    url: "{{ secret('SLACK_WEBHOOK_URL') }}"
    payload: |
      {
        "blocks": [
          {
            "type": "header",
            "text": {
              "type": "plain_text",
              "text": "⚠️ Plugin Health Alert"
            }
          },
          {
            "type": "section",
            "text": {
              "type": "mrkdwn",
              "text": "*Critical Plugins:*\n{% for plugin in outputs.generate_plugin_report.vars.most_failing_plugins %}- {{ plugin.plugin_name }}: {{ plugin.failure_count }} failures\n{% endfor %}"
            }
          }
        ]
      }
    conditions:
      - type: io.kestra.plugin.core.condition.Expression
        expression: "{{ outputs.generate_plugin_report.vars.health_summary.critical > 0 }}"

Part 5: Enterprise Plugin Patterns

Plugin Factory Pattern

// Plugin factory for creating plugin instances dynamically
public class PluginFactory {

    private final Map<String, PluginCreator> creators = new HashMap<>();

    public PluginFactory() {
        // Register plugin creators
        creators.put("salesforce", new SalesforcePluginCreator());
        creators.put("sap", new SapPluginCreator());
        creators.put("oracle", new OraclePluginCreator());
    }

    public Task createPlugin(String system, Map<String, Object> config) {
        PluginCreator creator = creators.get(system.toLowerCase());
        if (creator == null) {
            throw new IllegalArgumentException("Unknown system: " + system);
        }
        return creator.create(config);
    }

    interface PluginCreator {
        Task create(Map<String, Object> config);
    }

    static class SalesforcePluginCreator implements PluginCreator {
        @Override
        public Task create(Map<String, Object> config) {
            return SalesforceQuery.builder()
                .instanceUrl((String) config.get("instanceUrl"))
                .clientId((String) config.get("clientId"))
                .clientSecret((String) config.get("clientSecret"))
                .soql((String) config.get("query"))
                .build();
        }
    }
}

Plugin Adapter Pattern

# flows/plugin-adapter.yml
id: legacy-system-adapter
namespace: enterprise.integration

tasks:
  # Adapter for legacy SOAP service
  - id: soap_to_rest_adapter
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import zeep
      import requests
      import json

      # Legacy SOAP client
      client = zeep.Client('https://legacy.company.com/wsdl')

      # Convert REST request to SOAP
      request_data = {{ inputs.rest_request | tojson }}

      soap_response = client.service.ProcessRequest(
          method=request_data['method'],
          params=json.dumps(request_data['params'])
      )

      # Convert SOAP response to REST format
      rest_response = {
          'status': 'success' if soap_response.success else 'error',
          'data': json.loads(soap_response.data),
          'timestamp': soap_response.timestamp
      }

      print(json.dumps(rest_response))

  # Adapter for mainframe (via FTP)
  - id: mainframe_adapter
    type: io.kestra.plugin.fs.sftp.Download
    host: mainframe.company.com
    port: 22
    username: "{{ secret('MAINFRAME_USER') }}"
    password: "{{ secret('MAINFRAME_PASS') }}"
    from: "/reports/{{ execution.startDate | date('yyyyMMdd') }}.txt"

  - id: parse_mainframe_data
    type: io.kestra.plugin.scripts.python.Script
    inputFiles:
      mainframe_data.txt: "{{ outputs.mainframe_adapter.uri }}"
    script: |
      # Parse fixed-width mainframe format
      import struct
      import pandas as pd

      with open('mainframe_data.txt', 'r') as f:
          lines = f.readlines()

      # Define fixed-width format (example)
      format_spec = [
          ('customer_id', 10),
          ('name', 30),
          ('balance', 12),
          ('status', 1)
      ]

      records = []
      for line in lines:
          record = {}
          pos = 0
          for field, width in format_spec:
              value = line[pos:pos+width].strip()
              record[field] = value
              pos += width
          records.append(record)

      df = pd.DataFrame(records)
      df.to_parquet('mainframe_parsed.parquet', index=False)

Plugin Security Patterns

# flows/secure-plugin-usage.yml
id: secure-plugin-patterns
namespace: enterprise.security

tasks:
  # Secrets management
  - id: secure_database_connection
    type: io.kestra.plugin.jdbc.postgresql.Query
    url: "jdbc:postgresql://{{ secret('DB_HOST') }}:5432/{{ secret('DB_NAME') }}"
    username: "{{ secret('DB_USER') }}"
    password: "{{ secret('DB_PASSWORD') }}"
    sql: SELECT * FROM secure_table
    connectionProperties:
      ssl: "true"
      sslmode: "verify-full"
      sslrootcert: "/certs/ca.pem"

  # Encrypted data handling
  - id: process_encrypted_data
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import gnupg
      import pandas as pd
      from io import StringIO

      # Initialize GPG
      gpg = gnupg.GPG(gnupghome='/app/.gnupg')

      # Decrypt data
      with open('encrypted_data.pgp', 'rb') as f:
          decrypted = gpg.decrypt_file(f, passphrase='{{ secret('GPG_PASSPHRASE') }}')

      if decrypted.ok:
          # Process decrypted data
          df = pd.read_csv(StringIO(str(decrypted)))
          # ... processing ...

          # Re-encrypt results
          encrypted = gpg.encrypt(
              df.to_csv(index=False),
              recipients=['data-team@company.com'],
              always_trust=True
          )

          with open('results.pgp', 'w') as f:
              f.write(str(encrypted))
      else:
          raise Exception(f"Decryption failed: {decrypted.stderr}")
    docker:
      image: python:3.11-slim
      volumes:
        - /secrets/gnupg:/app/.gnupg:ro

  # Audit logging for sensitive operations
  - id: audited_plugin_operation
    type: io.kestra.plugin.core.flow.Sequential
    tasks:
      - id: log_operation_start
        type: io.kestra.plugin.jdbc.postgresql.Query
        sql: |
          INSERT INTO audit_log 
          (operation, user_id, start_time, parameters, execution_id)
          VALUES (
            'pii_data_extract',
            '{{ execution.user }}',
            NOW(),
            '{{ inputs | tojson }}',
            '{{ execution.id }}'
          )

      - id: perform_sensitive_operation
        type: io.kestra.plugin.jdbc.postgresql.Query
        sql: |
          SELECT 
            user_id,
            email,
            phone_number,
            masked_ssn
          FROM pii_data
          WHERE department = '{{ inputs.department }}'
        fetch: true

      - id: log_operation_complete
        type: io.kestra.plugin.jdbc.postgresql.Query
        sql: |
          UPDATE audit_log 
          SET 
            end_time = NOW(),
            row_count = {{ outputs.perform_sensitive_operation.rowCount }},
            status = 'SUCCESS'
          WHERE execution_id = '{{ execution.id }}'

Plugin Testing Framework

# flows/plugin-testing-framework.yml
id: plugin-testing-pipeline
namespace: quality.assurance

inputs:
  - name: plugin_name
    type: STRING
    required: true
  - name: test_scenarios
    type: JSON
    required: true

tasks:
  - id: setup_test_environment
    type: io.kestra.plugin.core.flow.Parallel
    tasks:
      - id: start_test_database
        type: io.kestra.plugin.docker.Run
        image: postgres:15
        env:
          POSTGRES_PASSWORD: test
          POSTGRES_DB: test_db
        ports:
          - 5432:5432
        wait: true
        maxDuration: PT1M

      - id: start_test_api
        type: io.kestra.plugin.docker.Run
        image: mockserver/mockserver
        command: -serverPort 1080
        ports:
          - 1080:1080
        wait: true
        maxDuration: PT1M

  - id: run_plugin_tests
    type: io.kestra.plugin.core.flow.EachSequential
    value: "{{ inputs.test_scenarios }}"
    tasks:
      - id: "execute_test_{{ taskloop.index }}"
        type: io.kestra.plugin.core.flow.Switch
        value: "{{ task.value.type }}"
        cases:
          database:
            - id: run_database_test
              type: io.kestra.plugin.jdbc.postgresql.Query
              url: jdbc:postgresql://localhost:5432/test_db
              username: postgres
              password: test
              sql: "{{ task.value.query }}"
              fetch: true

          api:
            - id: setup_api_mock
              type: io.kestra.plugin.core.http.Request
              uri: http://localhost:1080/mockserver/expectation
              method: PUT
              body: "{{ task.value.mock_config | tojson }}"

            - id: test_api_call
              type: io.kestra.plugin.core.http.Request
              uri: "http://localhost:1080{{ task.value.endpoint }}"
              method: "{{ task.value.method }}"
              body: "{{ task.value.request_body | tojson }}"

          performance:
            - id: run_performance_test
              type: io.kestra.plugin.core.flow.EachSequential
              value: "{{ range(1, task.value.iterations + 1) }}"
              maxParallel: 10
              tasks:
                - id: "iteration_{{ taskloop.value }}"
                  type: io.kestra.plugin.core.debug.Return
                  format: "{{ task.value }}"

  - id: collect_test_results
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import json
      import statistics

      # Collect results from all tests
      test_results = []

      {% for i in range(1, inputs.test_scenarios|length + 1) %}
      test_output = {{ outputs.run_plugin_tests.outputs['execute_test_' + i|string] | default({}) | tojson }}
      if test_output:
          test_results.append({
              'scenario': {{ i }},
              'success': test_output.get('state') == 'SUCCESS',
              'duration': test_output.get('duration', 0)
          })
      {% endfor %}

      # Calculate statistics
      summary = {
          'total_tests': len(test_results),
          'passed_tests': sum(1 for r in test_results if r['success']),
          'failed_tests': sum(1 for r in test_results if not r['success']),
          'pass_rate': sum(1 for r in test_results if r['success']) / len(test_results) * 100 if test_results else 0,
          'avg_duration': statistics.mean(r['duration'] for r in test_results) if test_results else 0,
          'results': test_results
      }

      print(json.dumps(summary, indent=2))

  - id: generate_test_report
    type: io.kestra.plugin.notifications.email.EmailSend
    to: "qa-team@company.com"
    subject: "Plugin Test Results - {{ inputs.plugin_name }}"
    htmlBody: |
      <h1>Plugin Test Report</h1>
      <p><strong>Plugin:</strong> {{ inputs.plugin_name }}</p>
      <p><strong>Pass Rate:</strong> {{ outputs.collect_test_results.vars.pass_rate | round(2) }}%</p>
      <p><strong>Total Tests:</strong> {{ outputs.collect_test_results.vars.total_tests }}</p>
      <p><strong>Execution:</strong> {{ execution.id }}</p>

Part 6: Contributing to the Plugin Ecosystem

Open Source Contribution Guide

# 1. Fork the repository
https://github.com/kestra-io/kestra

# 2. Clone your fork
git clone https://github.com/YOUR_USERNAME/kestra.git
cd kestra

# 3. Create a feature branch
git checkout -b feature/new-plugin

# 4. Create plugin structure
mkdir -p plugin-<name>/src/main/java/io/kestra/plugin/<category>
mkdir -p plugin-<name>/src/test/java/io/kestra/plugin/<category>

# 5. Implement your plugin
# (Follow existing plugin patterns)

# 6. Write tests
# (Aim for >80% test coverage)

# 7. Update documentation
# - README.md in plugin directory
# - Add examples in @Plugin annotation
# - Update plugin list in docs

# 8. Run tests
mvn test -pl plugin-<name>

# 9. Submit pull request
git push origin feature/new-plugin
# Create PR on GitHub

Plugin Contribution Checklist

## New Plugin Contribution Checklist

### ✅ Code Quality
- [ ] Follows Kestra coding conventions
- [ ] Proper error handling and logging
- [ ] Input validation with @NotNull annotations
- [ ] Comprehensive JavaDoc comments
- [ ] No SonarQube issues

### ✅ Testing
- [ ] Unit tests for all public methods
- [ ] Integration tests with real systems
- [ ] Test edge cases and error conditions
- [ ] Mock external dependencies
- [ ] Test coverage > 80%

### ✅ Documentation
- [ ] README.md with usage examples
- [ ] @Plugin annotation with examples
- [ ] Swagger annotations for all properties
- [ ] Clear property descriptions
- [ ] Example flows in /examples directory

### ✅ Configuration
- [ ] Proper Maven pom.xml
- [ ] Plugin registration in parent pom
- [ ] Version aligned with Kestra core
- [ ] No conflicting dependencies

### ✅ Security
- [ ] Secrets marked with secret = true
- [ ] No hardcoded credentials
- [ ] Input sanitization
- [ ] SSL/TLS support where applicable

### ✅ Performance
- [ ] Connection pooling for databases
- [ ] Batch operations for bulk data
- [ ] Memory-efficient streaming
- [ ] Timeout configurations
# Example of a well-documented community plugin
id: community-plugin-showcase
namespace: demo.community

tasks:
  # Example: dbt plugin from community
  - id: run_dbt_models
    type: io.kestra.plugin.dbt.cli.Run
    projectDir: "s3://my-dbt-project/"
    profilesDir: "s3://my-dbt-profiles/"
    target: prod
    select:
      - tag:hourly
      - model_name+
    exclude:
      - tag:disabled

  # Example: Great Expectations plugin
  - id: data_quality_check
    type: io.kestra.plugin.greatexpectations.Validation
    expectationSuite: |
      {
        "expectation_suite_name": "customer_data",
        "expectations": [
          {
            "expectation_type": "expect_column_values_to_not_be_null",
            "kwargs": {"column": "customer_id"}
          }
        ]
      }
    data:
      type: file
      path: "{{ inputs.data_file }}"

  # Example: Airbyte connector
  - id: sync_with_airbyte
    type: io.kestra.plugin.airbyte.Sync
    connectionId: "{{ secret('AIRBYTE_CONNECTION_ID') }}"
    workspaceId: "{{ secret('AIRBYTE_WORKSPACE_ID') }}"
    apiUrl: "https://api.airbyte.com"
    wait: true

  # Example: Dagster integration
  - id: trigger_dagster_pipeline
    type: io.kestra.plugin.dagster.Run
    repository: my_repo
    pipeline: process_data
    mode: default
    runConfig:
      resources:
        io_manager:
          config:
            s3_bucket: "dagster-output"

Conclusion: Becoming a Plugin Master

Key Takeaways

  1. Kestra's plugin ecosystem is vast: 150+ plugins covering every data engineering need

  2. Custom plugins are straightforward: Java-based, with clear interfaces and patterns

  3. Enterprise integration is possible: Create plugins for any system, no matter how legacy

  4. Performance matters: Optimize plugins for production workloads

  5. Community contribution is valuable: Share your plugins to help others

Next Steps for You

  1. Audit your data stack: What systems need plugins?

  2. Start simple: Create one custom plugin for an internal API

  3. Contribute: Find a missing plugin in the ecosystem and build it

  4. Share: Write a blog post about your plugin journey

  5. Optimize: Review existing plugins for performance improvements

Resources


In the next article, we'll dive into Monitoring and Observability in Kestra - how to ensure your workflows are running smoothly and troubleshoot when they're not.

Before the next article, try:

  1. Create a simple custom plugin for a system you use

  2. Optimize an existing plugin configuration for better performance

  3. Contribute documentation or examples to an existing plugin

  4. Set up a plugin testing pipeline for your team

  5. Share your plugin creation experience in the Kestra community

Remember: Every great data platform starts with great plugins. What will you build?

Happy plugin developing! 🛠️