Workflow Orchestration
Orchestrate complex multi-step workflows with the Skill Engine's powerful workflow system.
Overview
The Workflow Orchestration system enables you to:
- Chain multiple tools together with dependencies
- Execute steps in parallel for maximum performance
- Add conditional logic with when clauses
- Handle errors gracefully with retry policies
- Loop over collections with foreach steps
- Run arbitrary Docker containers for complex processing (FFmpeg, data tools, etc.)
- Execute inline code in Python, Bash, Node.js, or Ruby
- Monitor in real-time via WebSocket and Web UI
New in v0.3.4: Docker container steps and inline code execution provide powerful ways to extend workflows without creating custom skills, while maintaining security through Docker isolation.
Quick Start
1. Create a Workflow
Create a .toml file in .workflows/:
version = "1"
name = "hello-workflow"
description = "My first workflow"
[[steps]]
id = "greet"
step_type = "skill"
skill = "python"
tool = "execute"
args = { script = "print('Hello, Workflow!')" }2. Execute the Workflow
Via CLI:
skill workflow run .workflows/hello-workflow.tomlVia MCP:
mcp-cli call skill-engine/execute_workflow '{
"workflow": "hello-workflow",
"inputs": {}
}'Via HTTP API:
curl -X POST http://localhost:3000/api/workflows/hello-workflow/execute \
-H "Content-Type: application/json" \
-d '{"inputs": {}}'Core Concepts
Workflow Structure
version = "1" # Workflow format version
name = "my-workflow" # Unique workflow name
description = "..." # Human-readable description
[config] # Optional configuration
max_execution_time = "30m" # Total workflow timeout
retry_policy = "exponential" # Default retry strategy
max_retries = 3 # Max retry attempts
on_failure = "stop" # Error handling: stop | continue
max_parallel = 10 # Max concurrent steps
[inputs] # Runtime parameters
param1 = { type = "string", required = true }
param2 = { type = "number", default = 100 }
[[steps]] # Step definitions
id = "step-1" # Unique step ID
step_type = "skill" # Step type (see below)
skill = "skill-name" # Skill to execute (for skill steps)
tool = "tool-name" # Tool to run (for skill steps)
args = { key = "value" } # Tool arguments
depends_on = [] # Dependencies (other step IDs)
when = "condition" # Optional conditional execution
timeout = "5m" # Step timeoutStep Types
The workflow engine supports six step types:
| Type | Description | Use Case |
|---|---|---|
skill | Execute a skill tool | Reusable functionality from skill marketplace |
command | Run shell command | Simple CLI operations |
http | Make HTTP request | API calls, webhooks |
foreach | Loop over collection | Batch processing, parallel operations |
docker_container | Run arbitrary Docker container | Complex tools (FFmpeg, data processing) |
inline_code | Execute Python/Bash/Node/Ruby | Quick scripts without creating skills |
Skill Step
Execute a skill tool:
[[steps]]
id = "call-api"
step_type = "skill"
skill = "http"
tool = "request"
args = {
url = "https://api.example.com/data",
method = "GET"
}Command Step
Run shell commands:
[[steps]]
id = "run-script"
step_type = "command"
command = "python scripts/process.py --input data.json"
timeout = "5m"HTTP Step
Make HTTP requests:
[[steps]]
id = "webhook"
step_type = "http"
[steps.http]
method = "POST"
url = "https://hooks.slack.com/services/..."
headers = { "Content-Type" = "application/json" }
body = { text = "Workflow completed!" }Foreach Step
Loop over collections:
[[steps]]
id = "process-regions"
step_type = "foreach"
items = ["us-east-1", "eu-west-1", "ap-south-1"]
parallel = true
max_parallel = 3
max_iterations = 1000
[[steps.loop]]
id = "deploy-to-region"
step_type = "skill"
skill = "aws"
tool = "deploy"
args = { region = "{loop.item}" }Docker Container Step
Execute arbitrary Docker containers:
[[steps]]
id = "video-processing"
step_type = "docker_container"
timeout = "10m"
[steps.docker]
image = "jrottenberg/ffmpeg:5-alpine"
command = ["-i", "/input/video.mp4", "-vf", "scale=1280:720", "/output/output.mp4"]
volumes = ["./input:/input", "./output:/output"]
memory_limit = "512m"
cpu_limit = 1.0
environment = { "QUALITY" = "high" }
working_dir = "/workspace"
network = "none" # Isolated by default for security
user = "1000:1000" # Run as non-rootSecurity Features:
- Containers run isolated by default (no network access)
- Privileged mode is blocked
- Docker socket access is blocked
- Resource limits are enforced
- Containers run as non-root user by default
Inline Code Step
Execute Python, Bash, Node.js, or Ruby code:
[[steps]]
id = "process-data"
step_type = "inline_code"
[steps.inline_code]
language = "python"
code = '''
import json
import sys
# Read input from stdin
data = json.loads(sys.stdin.read())
# Process data
result = {
"count": len(data),
"items": [item.upper() for item in data]
}
# Output as JSON
print(json.dumps(result))
'''
stdin = "{inputs.data}"
environment = { "DEBUG" = "true" }Execution Modes:
- Docker (default): Runs code in isolated container for security
- Native (unsafe): Runs directly on host (use
unsafe_native = true)
Supported Languages:
python- Python 3.11 (default image:python:3.11-slim)bash- Bash shell (default image:alpine:latest)node- Node.js 20 (default image:node:20-alpine)ruby- Ruby 3.2 (default image:ruby:3.2-alpine)
Using External Scripts:
[steps.inline_code]
language = "python"
script_file = ".workflows/scripts/process.py"
# Or use inline code with 'code' fieldNative Execution (Trusted Workflows Only):
[steps.inline_code]
language = "bash"
code = "cargo build --release"
unsafe_native = true # Runs on host, bypasses Docker isolation
working_dir = "."⚠️ Security Warning: Only use unsafe_native = true for trusted workflows. Native execution bypasses all Docker isolation and runs with the same permissions as the workflow engine.
Template Variables
Access data from previous steps and inputs:
# Workflow inputs
"{inputs.param_name}"
# Step outputs
"{step-id.output}"
"{step-id.output.nested.field}"
# Step metadata
"{step-id.metadata.key}"
"{step-id.status}"
"{step-id.duration_ms}"
# Loop context
"{loop.item}" # Current iteration value
"{loop.index}" # Current iteration index
# Execution context
"{execution_id}" # Unique execution IDDependencies and Execution Order
Steps with no dependencies run in parallel:
[[steps]]
id = "fetch-a"
# Runs immediately
[[steps]]
id = "fetch-b"
# Runs in parallel with fetch-a
[[steps]]
id = "combine"
depends_on = ["fetch-a", "fetch-b"]
# Waits for both to completeThe execution engine automatically:
- Computes execution levels via topological sort
- Executes independent steps in parallel
- Respects
max_parallelconfiguration limit
Conditional Execution
Use when to conditionally execute steps:
[[steps]]
id = "validate-data"
# Always runs
[[steps]]
id = "process-data"
when = "{validate-data.metadata.quality_score} > 0.9"
depends_on = ["validate-data"]
# Only runs if quality score is high
[[steps]]
id = "send-alert"
when = "{validate-data.status} == 'failed'"
depends_on = ["validate-data"]
# Only runs if validation failedSupported Operators:
- Comparison:
==,!=,>,<,>=,<= - Logical:
&&,||,! - Membership:
in,not in
Error Handling and Retries
Global Error Strategy:
[config]
on_failure = "stop" # Stop on first error (default)
# or
on_failure = "continue" # Continue with remaining stepsPer-Step Retry:
[[steps]]
id = "flaky-api"
[steps.retry]
max_attempts = 3
backoff = "exponential" # none | linear | exponential | jitter
initial_delay = "1s"
max_delay = "5m"Backoff Strategies:
none: Immediate retrylinear: Fixed delay (1s, 1s, 1s, ...)exponential: Doubling delay (1s, 2s, 4s, 8s, ...)jitter: Exponential with randomization
Advanced Features
Parallel Health Checks
[[steps]]
id = "health-checks"
step_type = "foreach"
items = ["service-a", "service-b", "service-c"]
parallel = true
max_parallel = 10
[[steps.loop]]
id = "check-service"
step_type = "http"
[steps.loop.http]
method = "GET"
url = "https://{loop.item}.example.com/health"
timeout = "5s"Conditional Branching
[[steps]]
id = "check-environment"
[[steps]]
id = "deploy-to-prod"
when = "{check-environment.output.env} == 'production'"
depends_on = ["check-environment"]
[[steps]]
id = "deploy-to-staging"
when = "{check-environment.output.env} == 'staging'"
depends_on = ["check-environment"]Nested Data Access
[[steps]]
id = "fetch-user"
# Returns: { "output": { "user": { "id": 123, "name": "Alice" } } }
[[steps]]
id = "greet-user"
args = {
user_id = "{fetch-user.output.user.id}",
name = "{fetch-user.output.user.name}"
}Video Processing Pipeline
version = "1"
name = "video-transcoding"
description = "Transcode videos to multiple resolutions"
[[steps]]
id = "transcode-720p"
step_type = "docker_container"
timeout = "30m"
[steps.docker]
image = "jrottenberg/ffmpeg:5-alpine"
volumes = ["{inputs.input_dir}:/input", "{inputs.output_dir}:/output"]
command = [
"-i", "/input/{inputs.filename}",
"-vf", "scale=1280:720",
"-c:v", "libx264",
"-crf", "23",
"/output/{inputs.filename}_720p.mp4"
]
memory_limit = "2g"
cpu_limit = 2.0
[[steps]]
id = "transcode-480p"
step_type = "docker_container"
timeout = "30m"
[steps.docker]
image = "jrottenberg/ffmpeg:5-alpine"
volumes = ["{inputs.input_dir}:/input", "{inputs.output_dir}:/output"]
command = [
"-i", "/input/{inputs.filename}",
"-vf", "scale=854:480",
"-c:v", "libx264",
"-crf", "23",
"/output/{inputs.filename}_480p.mp4"
]
memory_limit = "1g"
cpu_limit = 1.0
[[steps]]
id = "generate-thumbnails"
step_type = "inline_code"
depends_on = ["transcode-720p"]
[steps.inline_code]
language = "bash"
code = '''
ffmpeg -i "{inputs.output_dir}/{inputs.filename}_720p.mp4" \
-vf "fps=1/10,scale=320:180" \
"{inputs.output_dir}/{inputs.filename}_thumb_%03d.jpg"
'''Data Processing with Python
[[steps]]
id = "fetch-data"
step_type = "http"
[steps.http]
method = "GET"
url = "https://api.example.com/data"
[[steps]]
id = "transform-data"
step_type = "inline_code"
depends_on = ["fetch-data"]
[steps.inline_code]
language = "python"
code = '''
import json
import sys
# Read data from previous step
data = json.loads(sys.stdin.read())
# Transform data
transformed = [
{
"id": item["id"],
"name": item["name"].upper(),
"processed_at": "2024-01-01"
}
for item in data["items"]
]
# Output result
print(json.dumps({"transformed": transformed}))
'''
stdin = "{fetch-data.output}"
environment = { "TZ" = "UTC" }
[[steps]]
id = "upload-results"
step_type = "http"
depends_on = ["transform-data"]
[steps.http]
method = "POST"
url = "https://api.example.com/results"
body = "{transform-data.output.transformed}"Parallel Batch Processing
[[steps]]
id = "list-files"
step_type = "inline_code"
[steps.inline_code]
language = "bash"
code = "ls -1 /data/input/*.csv | xargs -n1 basename"
[[steps]]
id = "process-files"
step_type = "foreach"
depends_on = ["list-files"]
items = "{list-files.output}" # Dynamic list from previous step
parallel = true
max_parallel = 5
[[steps.loop]]
id = "process-csv"
step_type = "docker_container"
[steps.loop.docker]
image = "python:3.11-slim"
volumes = ["/data:/data"]
command = ["python", "-c", '''
import pandas as pd
df = pd.read_csv(f"/data/input/{loop.item}")
df["processed"] = True
df.to_csv(f"/data/output/{loop.item}", index=False)
''']Resource Limits
[config]
max_execution_time = "1h" # Total workflow timeout
max_concurrent_steps = 20 # Max steps running simultaneously
max_total_steps = 1000 # Max steps in workflow
max_loop_iterations = 10000 # Max iterations per loop
[[steps]]
timeout = "5m" # Per-step timeout
memory_limit = "2GB" # Step memory limit (Docker/WASM)
cpu_limit = "2" # CPU allocation (Docker)Real-Time Monitoring
WebSocket Connection
Connect to workflow execution via WebSocket:
const ws = new WebSocket(`ws://localhost:3000/ws/executions/${executionId}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'workflow_started':
console.log('Workflow started:', data.workflow_name);
break;
case 'step_started':
console.log('Step started:', data.step_id);
break;
case 'step_completed':
console.log('Step completed:', data.step_id, data.output);
break;
case 'step_failed':
console.error('Step failed:', data.step_id, data.error);
break;
case 'workflow_completed':
console.log('Workflow completed:', data.status);
ws.close();
break;
}
};
// Send commands to server
ws.send(JSON.stringify({ command: 'cancel' }));Event Types
workflow_started- Execution beganstep_started- Step execution startedstep_progress- Step progress updatestep_completed- Step finished successfullystep_failed- Step failed with errorstep_skipped- Step skipped (condition not met)workflow_completed- Entire workflow finishedworkflow_cancelled- Workflow was cancelled
API Reference
HTTP Endpoints
List Workflows:
GET /api/workflowsGet Workflow:
GET /api/workflows/:nameCreate Workflow:
POST /api/workflows
Content-Type: application/json
{
"name": "my-workflow",
"content": "... TOML content ...",
"format": "toml"
}Execute Workflow:
POST /api/workflows/:name/execute
Content-Type: application/json
{
"inputs": {
"param1": "value1",
"param2": 123
},
"async_execution": true
}Get Execution Status:
GET /api/executions/:execution_idCancel Execution:
POST /api/executions/:execution_id/cancelList Executions:
GET /api/executions?workflow_name=my-workflow&status=completed&limit=10MCP Tools
execute_workflow:
{
"workflow": "workflow-name", // or file path
"inputs": { "key": "value" },
"async_execution": true
}get_workflow_status:
{
"execution_id": "uuid"
}list_workflows:
{
"name_pattern": ".*pipeline.*" // optional regex
}list_executions:
{
"workflow_name": "my-workflow",
"status": "completed",
"limit": 10
}Example Workflows
See .workflows/examples/ for production-ready examples:
- data-pipeline.toml - ETL workflow with validation and error handling
- k8s-deploy.toml - Kubernetes deployment with health checks and rollback
- ai-research.toml - Multi-source research with AI summarization
Best Practices
1. Design for Idempotency
Ensure steps can be safely retried:
[[steps]]
args = {
operation = "upsert", # Not insert - use upsert/merge
idempotency_key = "{execution_id}"
}2. Set Appropriate Timeouts
[[steps]]
timeout = "5m" # Don't use default - be explicit3. Use Meaningful Step IDs
[[steps]]
id = "fetch-customer-data" # Good
# not: "step1", "temp", "abc"4. Add Metadata for Observability
[[steps]]
# Your tool should emit metadata for debugging
# metadata = { row_count = 1234, duration_ms = 567 }5. Validate Inputs Early
[[steps]]
id = "validate-inputs"
# First step - fail fast if inputs are invalid6. Handle Partial Failures
[config]
on_failure = "continue" # For independent steps
[[steps]]
when = "{step-a.status} == 'completed'" # Check status before using output7. Use Loops Wisely
[[steps]]
max_iterations = 100 # Set reasonable limit
parallel = true # Use parallel when possible
max_parallel = 10 # Limit concurrency8. Security Best Practices
For Docker Container Steps:
[steps.docker]
image = "python:3.11-slim" # Use specific tags, not :latest
network = "none" # Default - no network access
user = "1000:1000" # Run as non-root
memory_limit = "512m" # Set resource limits
cpu_limit = 1.0
# NEVER mount docker.sock or use privileged modeFor Inline Code Steps:
[steps.inline_code]
# Default: Runs in Docker (secure)
language = "python"
code = "..."
# ONLY use unsafe_native for trusted workflows
# unsafe_native = true # Bypasses Docker isolation!Security Checklist:
- ✅ Use Docker isolation by default (don't set
unsafe_native) - ✅ Use specific image tags (e.g.,
python:3.11-slimnotpython:latest) - ✅ Run containers with minimal network access (network = "none")
- ✅ Set resource limits (memory_limit, cpu_limit)
- ✅ Run as non-root user when possible
- ✅ Validate all workflow inputs
- ✅ Review workflows before execution
- ❌ NEVER mount
/var/run/docker.sock - ❌ NEVER use privileged mode
- ❌ NEVER use
unsafe_nativefor untrusted workflows - ❌ NEVER expose host network to containers
Trust Levels:
- Trusted workflows: Created by your team, reviewed code
- Can use
unsafe_nativeif needed for performance
- Can use
- Community workflows: Public workflows from marketplace
- Always use Docker isolation
- Review before running
- User-provided workflows: From external sources
- Run in isolated environment
- Strict resource limits
- No network access
Troubleshooting
Workflow Won't Start
- Check TOML syntax with a linter
- Verify all dependencies exist
- Check for circular dependencies
Step Keeps Failing
- Review error message in execution logs
- Check timeout settings
- Verify tool arguments and permissions
- Add retry policy if transient failures
Performance Issues
- Use parallel execution where possible
- Increase
max_parallelif safe - Check for blocking dependencies
- Reduce loop iterations or batch size
Template Variables Not Resolving
- Verify step ID spelling
- Check step completed successfully
- Use correct JSON path syntax
- Review execution logs for errors
Docker Container Issues
"Docker daemon not available"
- Ensure Docker is running:
docker ps - Check Docker permissions for workflow engine user
- Verify Docker socket is accessible
- Ensure Docker is running:
"Image pull failed"
- Check image name and tag are correct
- Verify network connectivity for image pull
- Use
docker pull <image>manually to test - Consider using local images or private registry
"Permission denied" on volumes
- Check file/directory permissions on host
- Use correct user ID mapping (user field)
- Ensure paths exist before mounting
Container timeout
- Increase timeout value
- Check if process is actually stuck or just slow
- Review container logs for details
- Consider resource limits (may be too low)
Inline Code Issues
Script syntax error
- Test script locally first
- Check for proper escaping in TOML (use
'''for multiline) - Verify language syntax matches specified language
"Command not found" in Docker mode
- Verify the default Docker image includes required tools
- Override with custom image:
docker_image = "custom-image" - Install dependencies in code if needed
Native execution not working
- Check
unsafe_native = trueis set - Verify interpreter is installed on host
- Check PATH environment variable
- Review workflow engine permissions
- Check
Output not captured
- Ensure script writes to stdout (not files)
- Use
print()in Python,echoin Bash - Check for stderr output (may indicate errors)
- Verify JSON format if expecting structured output