Skip to content

Workflow Orchestration

Orchestrate complex multi-step workflows with the Skill Engine's powerful workflow system.

Overview

The Workflow Orchestration system enables you to:

  • Chain multiple tools together with dependencies
  • Execute steps in parallel for maximum performance
  • Add conditional logic with when clauses
  • Handle errors gracefully with retry policies
  • Loop over collections with foreach steps
  • Run arbitrary Docker containers for complex processing (FFmpeg, data tools, etc.)
  • Execute inline code in Python, Bash, Node.js, or Ruby
  • Monitor in real-time via WebSocket and Web UI

New in v0.3.4: Docker container steps and inline code execution provide powerful ways to extend workflows without creating custom skills, while maintaining security through Docker isolation.

Quick Start

1. Create a Workflow

Create a .toml file in .workflows/:

toml
version = "1"
name = "hello-workflow"
description = "My first workflow"

[[steps]]
id = "greet"
step_type = "skill"
skill = "python"
tool = "execute"
args = { script = "print('Hello, Workflow!')" }

2. Execute the Workflow

Via CLI:

bash
skill workflow run .workflows/hello-workflow.toml

Via MCP:

bash
mcp-cli call skill-engine/execute_workflow '{
  "workflow": "hello-workflow",
  "inputs": {}
}'

Via HTTP API:

bash
curl -X POST http://localhost:3000/api/workflows/hello-workflow/execute \
  -H "Content-Type: application/json" \
  -d '{"inputs": {}}'

Core Concepts

Workflow Structure

toml
version = "1"                    # Workflow format version
name = "my-workflow"             # Unique workflow name
description = "..."              # Human-readable description

[config]                         # Optional configuration
max_execution_time = "30m"       # Total workflow timeout
retry_policy = "exponential"     # Default retry strategy
max_retries = 3                  # Max retry attempts
on_failure = "stop"              # Error handling: stop | continue
max_parallel = 10                # Max concurrent steps

[inputs]                         # Runtime parameters
param1 = { type = "string", required = true }
param2 = { type = "number", default = 100 }

[[steps]]                        # Step definitions
id = "step-1"                    # Unique step ID
step_type = "skill"              # Step type (see below)
skill = "skill-name"             # Skill to execute (for skill steps)
tool = "tool-name"               # Tool to run (for skill steps)
args = { key = "value" }         # Tool arguments
depends_on = []                  # Dependencies (other step IDs)
when = "condition"               # Optional conditional execution
timeout = "5m"                   # Step timeout

Step Types

The workflow engine supports six step types:

TypeDescriptionUse Case
skillExecute a skill toolReusable functionality from skill marketplace
commandRun shell commandSimple CLI operations
httpMake HTTP requestAPI calls, webhooks
foreachLoop over collectionBatch processing, parallel operations
docker_containerRun arbitrary Docker containerComplex tools (FFmpeg, data processing)
inline_codeExecute Python/Bash/Node/RubyQuick scripts without creating skills

Skill Step

Execute a skill tool:

toml
[[steps]]
id = "call-api"
step_type = "skill"
skill = "http"
tool = "request"
args = {
    url = "https://api.example.com/data",
    method = "GET"
}

Command Step

Run shell commands:

toml
[[steps]]
id = "run-script"
step_type = "command"
command = "python scripts/process.py --input data.json"
timeout = "5m"

HTTP Step

Make HTTP requests:

toml
[[steps]]
id = "webhook"
step_type = "http"
[steps.http]
method = "POST"
url = "https://hooks.slack.com/services/..."
headers = { "Content-Type" = "application/json" }
body = { text = "Workflow completed!" }

Foreach Step

Loop over collections:

toml
[[steps]]
id = "process-regions"
step_type = "foreach"
items = ["us-east-1", "eu-west-1", "ap-south-1"]
parallel = true
max_parallel = 3
max_iterations = 1000

[[steps.loop]]
id = "deploy-to-region"
step_type = "skill"
skill = "aws"
tool = "deploy"
args = { region = "{loop.item}" }

Docker Container Step

Execute arbitrary Docker containers:

toml
[[steps]]
id = "video-processing"
step_type = "docker_container"
timeout = "10m"

[steps.docker]
image = "jrottenberg/ffmpeg:5-alpine"
command = ["-i", "/input/video.mp4", "-vf", "scale=1280:720", "/output/output.mp4"]
volumes = ["./input:/input", "./output:/output"]
memory_limit = "512m"
cpu_limit = 1.0
environment = { "QUALITY" = "high" }
working_dir = "/workspace"
network = "none"  # Isolated by default for security
user = "1000:1000"  # Run as non-root

Security Features:

  • Containers run isolated by default (no network access)
  • Privileged mode is blocked
  • Docker socket access is blocked
  • Resource limits are enforced
  • Containers run as non-root user by default

Inline Code Step

Execute Python, Bash, Node.js, or Ruby code:

toml
[[steps]]
id = "process-data"
step_type = "inline_code"

[steps.inline_code]
language = "python"
code = '''
import json
import sys

# Read input from stdin
data = json.loads(sys.stdin.read())

# Process data
result = {
    "count": len(data),
    "items": [item.upper() for item in data]
}

# Output as JSON
print(json.dumps(result))
'''
stdin = "{inputs.data}"
environment = { "DEBUG" = "true" }

Execution Modes:

  • Docker (default): Runs code in isolated container for security
  • Native (unsafe): Runs directly on host (use unsafe_native = true)

Supported Languages:

  • python - Python 3.11 (default image: python:3.11-slim)
  • bash - Bash shell (default image: alpine:latest)
  • node - Node.js 20 (default image: node:20-alpine)
  • ruby - Ruby 3.2 (default image: ruby:3.2-alpine)

Using External Scripts:

toml
[steps.inline_code]
language = "python"
script_file = ".workflows/scripts/process.py"
# Or use inline code with 'code' field

Native Execution (Trusted Workflows Only):

toml
[steps.inline_code]
language = "bash"
code = "cargo build --release"
unsafe_native = true  # Runs on host, bypasses Docker isolation
working_dir = "."

⚠️ Security Warning: Only use unsafe_native = true for trusted workflows. Native execution bypasses all Docker isolation and runs with the same permissions as the workflow engine.

Template Variables

Access data from previous steps and inputs:

toml
# Workflow inputs
"{inputs.param_name}"

# Step outputs
"{step-id.output}"
"{step-id.output.nested.field}"

# Step metadata
"{step-id.metadata.key}"
"{step-id.status}"
"{step-id.duration_ms}"

# Loop context
"{loop.item}"        # Current iteration value
"{loop.index}"       # Current iteration index

# Execution context
"{execution_id}"     # Unique execution ID

Dependencies and Execution Order

Steps with no dependencies run in parallel:

toml
[[steps]]
id = "fetch-a"
# Runs immediately

[[steps]]
id = "fetch-b"
# Runs in parallel with fetch-a

[[steps]]
id = "combine"
depends_on = ["fetch-a", "fetch-b"]
# Waits for both to complete

The execution engine automatically:

  • Computes execution levels via topological sort
  • Executes independent steps in parallel
  • Respects max_parallel configuration limit

Conditional Execution

Use when to conditionally execute steps:

toml
[[steps]]
id = "validate-data"
# Always runs

[[steps]]
id = "process-data"
when = "{validate-data.metadata.quality_score} > 0.9"
depends_on = ["validate-data"]
# Only runs if quality score is high

[[steps]]
id = "send-alert"
when = "{validate-data.status} == 'failed'"
depends_on = ["validate-data"]
# Only runs if validation failed

Supported Operators:

  • Comparison: ==, !=, >, <, >=, <=
  • Logical: &&, ||, !
  • Membership: in, not in

Error Handling and Retries

Global Error Strategy:

toml
[config]
on_failure = "stop"      # Stop on first error (default)
# or
on_failure = "continue"  # Continue with remaining steps

Per-Step Retry:

toml
[[steps]]
id = "flaky-api"
[steps.retry]
max_attempts = 3
backoff = "exponential"   # none | linear | exponential | jitter
initial_delay = "1s"
max_delay = "5m"

Backoff Strategies:

  • none: Immediate retry
  • linear: Fixed delay (1s, 1s, 1s, ...)
  • exponential: Doubling delay (1s, 2s, 4s, 8s, ...)
  • jitter: Exponential with randomization

Advanced Features

Parallel Health Checks

toml
[[steps]]
id = "health-checks"
step_type = "foreach"
items = ["service-a", "service-b", "service-c"]
parallel = true
max_parallel = 10

[[steps.loop]]
id = "check-service"
step_type = "http"
[steps.loop.http]
method = "GET"
url = "https://{loop.item}.example.com/health"
timeout = "5s"

Conditional Branching

toml
[[steps]]
id = "check-environment"

[[steps]]
id = "deploy-to-prod"
when = "{check-environment.output.env} == 'production'"
depends_on = ["check-environment"]

[[steps]]
id = "deploy-to-staging"
when = "{check-environment.output.env} == 'staging'"
depends_on = ["check-environment"]

Nested Data Access

toml
[[steps]]
id = "fetch-user"
# Returns: { "output": { "user": { "id": 123, "name": "Alice" } } }

[[steps]]
id = "greet-user"
args = {
    user_id = "{fetch-user.output.user.id}",
    name = "{fetch-user.output.user.name}"
}

Video Processing Pipeline

toml
version = "1"
name = "video-transcoding"
description = "Transcode videos to multiple resolutions"

[[steps]]
id = "transcode-720p"
step_type = "docker_container"
timeout = "30m"

[steps.docker]
image = "jrottenberg/ffmpeg:5-alpine"
volumes = ["{inputs.input_dir}:/input", "{inputs.output_dir}:/output"]
command = [
    "-i", "/input/{inputs.filename}",
    "-vf", "scale=1280:720",
    "-c:v", "libx264",
    "-crf", "23",
    "/output/{inputs.filename}_720p.mp4"
]
memory_limit = "2g"
cpu_limit = 2.0

[[steps]]
id = "transcode-480p"
step_type = "docker_container"
timeout = "30m"

[steps.docker]
image = "jrottenberg/ffmpeg:5-alpine"
volumes = ["{inputs.input_dir}:/input", "{inputs.output_dir}:/output"]
command = [
    "-i", "/input/{inputs.filename}",
    "-vf", "scale=854:480",
    "-c:v", "libx264",
    "-crf", "23",
    "/output/{inputs.filename}_480p.mp4"
]
memory_limit = "1g"
cpu_limit = 1.0

[[steps]]
id = "generate-thumbnails"
step_type = "inline_code"
depends_on = ["transcode-720p"]

[steps.inline_code]
language = "bash"
code = '''
ffmpeg -i "{inputs.output_dir}/{inputs.filename}_720p.mp4" \
  -vf "fps=1/10,scale=320:180" \
  "{inputs.output_dir}/{inputs.filename}_thumb_%03d.jpg"
'''

Data Processing with Python

toml
[[steps]]
id = "fetch-data"
step_type = "http"
[steps.http]
method = "GET"
url = "https://api.example.com/data"

[[steps]]
id = "transform-data"
step_type = "inline_code"
depends_on = ["fetch-data"]

[steps.inline_code]
language = "python"
code = '''
import json
import sys

# Read data from previous step
data = json.loads(sys.stdin.read())

# Transform data
transformed = [
    {
        "id": item["id"],
        "name": item["name"].upper(),
        "processed_at": "2024-01-01"
    }
    for item in data["items"]
]

# Output result
print(json.dumps({"transformed": transformed}))
'''
stdin = "{fetch-data.output}"
environment = { "TZ" = "UTC" }

[[steps]]
id = "upload-results"
step_type = "http"
depends_on = ["transform-data"]
[steps.http]
method = "POST"
url = "https://api.example.com/results"
body = "{transform-data.output.transformed}"

Parallel Batch Processing

toml
[[steps]]
id = "list-files"
step_type = "inline_code"

[steps.inline_code]
language = "bash"
code = "ls -1 /data/input/*.csv | xargs -n1 basename"

[[steps]]
id = "process-files"
step_type = "foreach"
depends_on = ["list-files"]
items = "{list-files.output}"  # Dynamic list from previous step
parallel = true
max_parallel = 5

[[steps.loop]]
id = "process-csv"
step_type = "docker_container"

[steps.loop.docker]
image = "python:3.11-slim"
volumes = ["/data:/data"]
command = ["python", "-c", '''
import pandas as pd
df = pd.read_csv(f"/data/input/{loop.item}")
df["processed"] = True
df.to_csv(f"/data/output/{loop.item}", index=False)
''']

Resource Limits

toml
[config]
max_execution_time = "1h"        # Total workflow timeout
max_concurrent_steps = 20        # Max steps running simultaneously
max_total_steps = 1000          # Max steps in workflow
max_loop_iterations = 10000      # Max iterations per loop

[[steps]]
timeout = "5m"                   # Per-step timeout
memory_limit = "2GB"             # Step memory limit (Docker/WASM)
cpu_limit = "2"                  # CPU allocation (Docker)

Real-Time Monitoring

WebSocket Connection

Connect to workflow execution via WebSocket:

javascript
const ws = new WebSocket(`ws://localhost:3000/ws/executions/${executionId}`);

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);

  switch (data.type) {
    case 'workflow_started':
      console.log('Workflow started:', data.workflow_name);
      break;
    case 'step_started':
      console.log('Step started:', data.step_id);
      break;
    case 'step_completed':
      console.log('Step completed:', data.step_id, data.output);
      break;
    case 'step_failed':
      console.error('Step failed:', data.step_id, data.error);
      break;
    case 'workflow_completed':
      console.log('Workflow completed:', data.status);
      ws.close();
      break;
  }
};

// Send commands to server
ws.send(JSON.stringify({ command: 'cancel' }));

Event Types

  • workflow_started - Execution began
  • step_started - Step execution started
  • step_progress - Step progress update
  • step_completed - Step finished successfully
  • step_failed - Step failed with error
  • step_skipped - Step skipped (condition not met)
  • workflow_completed - Entire workflow finished
  • workflow_cancelled - Workflow was cancelled

API Reference

HTTP Endpoints

List Workflows:

http
GET /api/workflows

Get Workflow:

http
GET /api/workflows/:name

Create Workflow:

http
POST /api/workflows
Content-Type: application/json

{
  "name": "my-workflow",
  "content": "... TOML content ...",
  "format": "toml"
}

Execute Workflow:

http
POST /api/workflows/:name/execute
Content-Type: application/json

{
  "inputs": {
    "param1": "value1",
    "param2": 123
  },
  "async_execution": true
}

Get Execution Status:

http
GET /api/executions/:execution_id

Cancel Execution:

http
POST /api/executions/:execution_id/cancel

List Executions:

http
GET /api/executions?workflow_name=my-workflow&status=completed&limit=10

MCP Tools

execute_workflow:

javascript
{
  "workflow": "workflow-name",  // or file path
  "inputs": { "key": "value" },
  "async_execution": true
}

get_workflow_status:

javascript
{
  "execution_id": "uuid"
}

list_workflows:

javascript
{
  "name_pattern": ".*pipeline.*"  // optional regex
}

list_executions:

javascript
{
  "workflow_name": "my-workflow",
  "status": "completed",
  "limit": 10
}

Example Workflows

See .workflows/examples/ for production-ready examples:

  • data-pipeline.toml - ETL workflow with validation and error handling
  • k8s-deploy.toml - Kubernetes deployment with health checks and rollback
  • ai-research.toml - Multi-source research with AI summarization

Best Practices

1. Design for Idempotency

Ensure steps can be safely retried:

toml
[[steps]]
args = {
    operation = "upsert",  # Not insert - use upsert/merge
    idempotency_key = "{execution_id}"
}

2. Set Appropriate Timeouts

toml
[[steps]]
timeout = "5m"  # Don't use default - be explicit

3. Use Meaningful Step IDs

toml
[[steps]]
id = "fetch-customer-data"  # Good
# not: "step1", "temp", "abc"

4. Add Metadata for Observability

toml
[[steps]]
# Your tool should emit metadata for debugging
# metadata = { row_count = 1234, duration_ms = 567 }

5. Validate Inputs Early

toml
[[steps]]
id = "validate-inputs"
# First step - fail fast if inputs are invalid

6. Handle Partial Failures

toml
[config]
on_failure = "continue"  # For independent steps

[[steps]]
when = "{step-a.status} == 'completed'"  # Check status before using output

7. Use Loops Wisely

toml
[[steps]]
max_iterations = 100      # Set reasonable limit
parallel = true           # Use parallel when possible
max_parallel = 10         # Limit concurrency

8. Security Best Practices

For Docker Container Steps:

toml
[steps.docker]
image = "python:3.11-slim"      # Use specific tags, not :latest
network = "none"                # Default - no network access
user = "1000:1000"             # Run as non-root
memory_limit = "512m"          # Set resource limits
cpu_limit = 1.0
# NEVER mount docker.sock or use privileged mode

For Inline Code Steps:

toml
[steps.inline_code]
# Default: Runs in Docker (secure)
language = "python"
code = "..."

# ONLY use unsafe_native for trusted workflows
# unsafe_native = true  # Bypasses Docker isolation!

Security Checklist:

  • ✅ Use Docker isolation by default (don't set unsafe_native)
  • ✅ Use specific image tags (e.g., python:3.11-slim not python:latest)
  • ✅ Run containers with minimal network access (network = "none")
  • ✅ Set resource limits (memory_limit, cpu_limit)
  • ✅ Run as non-root user when possible
  • ✅ Validate all workflow inputs
  • ✅ Review workflows before execution
  • ❌ NEVER mount /var/run/docker.sock
  • ❌ NEVER use privileged mode
  • ❌ NEVER use unsafe_native for untrusted workflows
  • ❌ NEVER expose host network to containers

Trust Levels:

  • Trusted workflows: Created by your team, reviewed code
    • Can use unsafe_native if needed for performance
  • Community workflows: Public workflows from marketplace
    • Always use Docker isolation
    • Review before running
  • User-provided workflows: From external sources
    • Run in isolated environment
    • Strict resource limits
    • No network access

Troubleshooting

Workflow Won't Start

  • Check TOML syntax with a linter
  • Verify all dependencies exist
  • Check for circular dependencies

Step Keeps Failing

  • Review error message in execution logs
  • Check timeout settings
  • Verify tool arguments and permissions
  • Add retry policy if transient failures

Performance Issues

  • Use parallel execution where possible
  • Increase max_parallel if safe
  • Check for blocking dependencies
  • Reduce loop iterations or batch size

Template Variables Not Resolving

  • Verify step ID spelling
  • Check step completed successfully
  • Use correct JSON path syntax
  • Review execution logs for errors

Docker Container Issues

  • "Docker daemon not available"

    • Ensure Docker is running: docker ps
    • Check Docker permissions for workflow engine user
    • Verify Docker socket is accessible
  • "Image pull failed"

    • Check image name and tag are correct
    • Verify network connectivity for image pull
    • Use docker pull <image> manually to test
    • Consider using local images or private registry
  • "Permission denied" on volumes

    • Check file/directory permissions on host
    • Use correct user ID mapping (user field)
    • Ensure paths exist before mounting
  • Container timeout

    • Increase timeout value
    • Check if process is actually stuck or just slow
    • Review container logs for details
    • Consider resource limits (may be too low)

Inline Code Issues

  • Script syntax error

    • Test script locally first
    • Check for proper escaping in TOML (use ''' for multiline)
    • Verify language syntax matches specified language
  • "Command not found" in Docker mode

    • Verify the default Docker image includes required tools
    • Override with custom image: docker_image = "custom-image"
    • Install dependencies in code if needed
  • Native execution not working

    • Check unsafe_native = true is set
    • Verify interpreter is installed on host
    • Check PATH environment variable
    • Review workflow engine permissions
  • Output not captured

    • Ensure script writes to stdout (not files)
    • Use print() in Python, echo in Bash
    • Check for stderr output (may indicate errors)
    • Verify JSON format if expecting structured output

See Also

Released under the Apache-2.0 License.