In Part 1, we introduced the concept of building a custom workflow system. Now it's time to get our hands dirty! In this post, we'll build the core workflow engine - the heart of our system that executes node-based workflows.
By the end of this post, you'll have:
We've organized our solution into focused projects for maintainability:
Mostlylucid.Workflow.Shared/ # Shared models and DTOs
├── Models/
│ ├── WorkflowNode.cs # Node definition
│ ├── NodeConnection.cs # Connections between nodes
│ ├── WorkflowDefinition.cs # Complete workflow definition
│ └── WorkflowExecution.cs # Execution tracking
Mostlylucid.Workflow.Engine/ # Core execution engine
├── Interfaces/
│ ├── IWorkflowNode.cs # Node interface
│ ├── IWorkflowExecutor.cs # Executor interface
│ └── INodeRegistry.cs # Node registry interface
├── Execution/
│ ├── NodeRegistry.cs # Registry for node types
│ └── WorkflowExecutor.cs # Main execution engine
└── Nodes/
├── BaseWorkflowNode.cs # Base node implementation
├── HttpRequestNode.cs # HTTP API calls
├── TransformNode.cs # Data transformation
└── DelayNode.cs # Delay execution
Mostlylucid.Shared/Entities/ # Database entities (EF Core)
├── WorkflowDefinitionEntity.cs
├── WorkflowExecutionEntity.cs
└── WorkflowTriggerStateEntity.cs
The WorkflowNode is the building block of all workflows. It's designed to be completely serializable to JSON:
public class WorkflowNode
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Type { get; set; } = string.Empty;
public string Name { get; set; } = string.Empty;
public string? Description { get; set; }
// Configuration
public Dictionary<string, object> Inputs { get; set; } = new();
public Dictionary<string, string> Outputs { get; set; } = new();
public Dictionary<string, string> Conditions { get; set; } = new();
// Visual properties
public NodePosition Position { get; set; } = new();
public NodeStyle Style { get; set; } = new();
}
public class NodeStyle
{
public string BackgroundColor { get; set; } = "#3B82F6";
public string TextColor { get; set; } = "#FFFFFF";
public string BorderColor { get; set; } = "#2563EB";
public string? Icon { get; set; }
public int Width { get; set; } = 200;
public int Height { get; set; } = 100;
}
Key Design Decisions:
{{variable}} syntax for dynamic dataConnections define how data flows between nodes:
public class NodeConnection
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string SourceNodeId { get; set; } = string.Empty;
public string TargetNodeId { get; set; } = string.Empty;
public string SourceOutput { get; set; } = "default";
public string TargetInput { get; set; } = "default";
public string? Condition { get; set; }
public string? Label { get; set; }
}
This allows for:
The complete workflow:
public class WorkflowDefinition
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Name { get; set; } = string.Empty;
public string? Description { get; set; }
public int Version { get; set; } = 1;
public List<WorkflowNode> Nodes { get; set; } = new();
public List<NodeConnection> Connections { get; set; } = new();
public string? StartNodeId { get; set; }
public List<string> Tags { get; set; } = new();
public bool IsEnabled { get; set; } = true;
public Dictionary<string, object>? Variables { get; set; }
}
The WorkflowExecutor is the brain of our system. It interprets workflow definitions and executes them.
public async Task<WorkflowExecution> ExecuteAsync(
WorkflowDefinition workflow,
Dictionary<string, object>? inputData = null,
string? triggeredBy = null,
CancellationToken cancellationToken = default)
{
var execution = new WorkflowExecution
{
Id = Guid.NewGuid().ToString(),
WorkflowId = workflow.Id,
Status = WorkflowExecutionStatus.Running,
StartedAt = DateTime.UtcNow,
InputData = inputData,
Context = new Dictionary<string, object>(inputData ?? new())
};
try
{
// Validate workflow
var validationErrors = ValidateWorkflow(workflow);
if (validationErrors.Any())
{
throw new InvalidOperationException(
$"Workflow validation failed: {string.Join(", ", validationErrors)}");
}
// Create execution context
var context = new WorkflowExecutionContext
{
Execution = execution,
Workflow = workflow,
Data = execution.Context,
Services = _serviceProvider
};
// Find and execute start node
var startNode = workflow.Nodes.FirstOrDefault(n => n.Id == workflow.StartNodeId);
if (startNode == null)
{
throw new InvalidOperationException("No start node found");
}
await ExecuteNodeRecursiveAsync(startNode, context, cancellationToken);
execution.Status = WorkflowExecutionStatus.Completed;
execution.CompletedAt = DateTime.UtcNow;
execution.OutputData = context.Data;
}
catch (Exception ex)
{
execution.Status = WorkflowExecutionStatus.Failed;
execution.ErrorMessage = ex.Message;
// ... error handling
}
return execution;
}
The key to our execution model is recursion. Each node executes, then triggers its downstream nodes:
private async Task ExecuteNodeRecursiveAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken)
{
// Get node implementation from registry
var node = _nodeRegistry.GetNode(nodeConfig.Type);
if (node == null)
{
throw new InvalidOperationException($"Node type '{nodeConfig.Type}' not registered");
}
// Execute the node
var result = await node.ExecuteAsync(nodeConfig, context, cancellationToken);
// Record execution history
context.Execution.NodeExecutions.Add(result);
// Store outputs for downstream nodes
if (result.OutputData != null)
{
context.NodeOutputs[nodeConfig.Id] = result.OutputData;
// Merge into shared context
foreach (var (key, value) in result.OutputData)
{
context.Data[key] = value;
}
}
// Handle failure with error routing
if (result.Status == NodeExecutionStatus.Failed)
{
var errorConnection = context.Workflow.Connections
.FirstOrDefault(c => c.SourceNodeId == nodeConfig.Id &&
c.SourceOutput == "error");
if (errorConnection != null)
{
// Route to error handler
var errorNode = context.Workflow.Nodes
.FirstOrDefault(n => n.Id == errorConnection.TargetNodeId);
if (errorNode != null)
{
await ExecuteNodeRecursiveAsync(errorNode, context, cancellationToken);
return;
}
}
throw new Exception($"Node {nodeConfig.Id} failed: {result.ErrorMessage}");
}
// Find and execute downstream nodes
var outgoingConnections = context.Workflow.Connections
.Where(c => c.SourceNodeId == nodeConfig.Id && c.SourceOutput != "error")
.ToList();
foreach (var connection in outgoingConnections)
{
// Check connection condition
if (!string.IsNullOrEmpty(connection.Condition))
{
if (!EvaluateCondition(connection.Condition, context))
{
continue; // Skip this connection
}
}
// Execute target node
var targetNode = context.Workflow.Nodes
.FirstOrDefault(n => n.Id == connection.TargetNodeId);
if (targetNode != null)
{
await ExecuteNodeRecursiveAsync(targetNode, context, cancellationToken);
}
}
}
Why Recursive?
The NodeRegistry allows dynamic registration of node types:
public class NodeRegistry : INodeRegistry
{
private readonly Dictionary<string, Type> _nodeTypes = new();
private readonly IServiceProvider _serviceProvider;
public void RegisterNode<TNode>(string nodeType) where TNode : IWorkflowNode
{
_nodeTypes[nodeType] = typeof(TNode);
}
public IWorkflowNode? GetNode(string nodeType)
{
if (!_nodeTypes.TryGetValue(nodeType, out var type))
{
return null;
}
// Try DI first, fallback to Activator
return _serviceProvider.GetService(type) as IWorkflowNode
?? Activator.CreateInstance(type) as IWorkflowNode;
}
}
This design allows:
Makes HTTP API calls with full configuration:
public class HttpRequestNode : BaseWorkflowNode
{
private readonly IHttpClientFactory _httpClientFactory;
public override string NodeType => "HttpRequest";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var url = resolvedInputs.GetValueOrDefault("url")?.ToString();
var method = resolvedInputs.GetValueOrDefault("method")?.ToString() ?? "GET";
var headers = resolvedInputs.GetValueOrDefault("headers") as Dictionary<string, object>;
var body = resolvedInputs.GetValueOrDefault("body");
var client = _httpClientFactory.CreateClient();
// Add headers
if (headers != null)
{
foreach (var (key, value) in headers)
{
client.DefaultRequestHeaders.TryAddWithoutValidation(
key, value?.ToString() ?? string.Empty);
}
}
// Make request
HttpResponseMessage response = method.ToUpper() switch
{
"GET" => await client.GetAsync(url, cancellationToken),
"POST" => await client.PostAsJsonAsync(url, body, cancellationToken),
"PUT" => await client.PutAsJsonAsync(url, body, cancellationToken),
"DELETE" => await client.DeleteAsync(url, cancellationToken),
_ => throw new InvalidOperationException($"Unsupported method: {method}")
};
var responseBody = await response.Content.ReadAsStringAsync(cancellationToken);
var outputData = new Dictionary<string, object>
{
["statusCode"] = (int)response.StatusCode,
["body"] = JsonSerializer.Deserialize<Dictionary<string, object>>(responseBody)
?? responseBody,
["isSuccess"] = response.IsSuccessStatusCode
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
Example Usage:
{
"type": "HttpRequest",
"inputs": {
"url": "https://api.github.com/repos/{{owner}}/{{repo}}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{apiToken}}",
"Accept": "application/vnd.github+json"
}
},
"outputs": {
"repoData": "{{body}}",
"statusCode": "{{statusCode}}"
}
}
Simple data transformations:
public class TransformNode : BaseWorkflowNode
{
public override string NodeType => "Transform";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var operation = resolvedInputs.GetValueOrDefault("operation")?.ToString();
var inputData = resolvedInputs.GetValueOrDefault("data");
object result = operation?.ToLower() switch
{
"uppercase" => inputData?.ToString()?.ToUpper() ?? string.Empty,
"lowercase" => inputData?.ToString()?.ToLower() ?? string.Empty,
"trim" => inputData?.ToString()?.Trim() ?? string.Empty,
"length" => inputData?.ToString()?.Length ?? 0,
"json_parse" => JsonSerializer.Deserialize<Dictionary<string, object>>(
inputData?.ToString() ?? "{}"),
"json_stringify" => JsonSerializer.Serialize(inputData),
_ => inputData ?? string.Empty
};
var outputData = new Dictionary<string, object>
{
["result"] = result
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
Adds delays to workflows:
public class DelayNode : BaseWorkflowNode
{
public override string NodeType => "Delay";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var durationMs = int.Parse(
resolvedInputs.GetValueOrDefault("durationMs")?.ToString() ?? "0");
await Task.Delay(durationMs, cancellationToken);
var outputData = new Dictionary<string, object>
{
["delayedMs"] = durationMs,
["completedAt"] = DateTime.UtcNow.ToString("O")
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
Nodes support template variables using {{variable}} syntax. The BaseWorkflowNode provides helper methods:
protected string ResolveTemplate(string template, WorkflowExecutionContext context)
{
if (string.IsNullOrEmpty(template)) return template;
var result = template;
var matches = Regex.Matches(template, @"\{\{([^}]+)\}\}");
foreach (Match match in matches)
{
var variable = match.Groups[1].Value.Trim();
if (context.Data.TryGetValue(variable, out var value))
{
result = result.Replace(match.Value, value?.ToString() ?? string.Empty);
}
}
return result;
}
This allows powerful dynamic workflows:
{
"type": "HttpRequest",
"inputs": {
"url": "{{apiBaseUrl}}/users/{{userId}}/posts",
"headers": {
"Authorization": "Bearer {{authToken}}"
}
}
}
We use Entity Framework Core with PostgreSQL for persistence:
[Table("workflow_definitions")]
public class WorkflowDefinitionEntity
{
[Key]
public int Id { get; set; }
[Required]
[MaxLength(100)]
public string WorkflowId { get; set; } = string.Empty;
[Required]
[MaxLength(200)]
public string Name { get; set; } = string.Empty;
[Column(TypeName = "jsonb")]
public string DefinitionJson { get; set; } = string.Empty;
public bool IsEnabled { get; set; } = true;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
public ICollection<WorkflowExecutionEntity> Executions { get; set; } = new List<WorkflowExecutionEntity>();
}
Why JSONB?
Here's a complete workflow that fetches GitHub repo data and transforms it:
{
"id": "github-repo-workflow",
"name": "GitHub Repository Info Fetcher",
"startNodeId": "fetch-repo",
"nodes": [
{
"id": "fetch-repo",
"type": "HttpRequest",
"name": "Fetch Repository",
"inputs": {
"url": "https://api.github.com/repos/{{owner}}/{{repo}}",
"method": "GET",
"headers": {
"Accept": "application/vnd.github+json"
}
},
"outputs": {
"repoData": "{{body}}"
},
"position": { "x": 100, "y": 100 },
"style": { "backgroundColor": "#10B981", "icon": "🔍" }
},
{
"id": "extract-name",
"type": "Transform",
"name": "Extract Repo Name",
"inputs": {
"operation": "json_stringify",
"data": "{{repoData}}"
},
"position": { "x": 100, "y": 250 },
"style": { "backgroundColor": "#3B82F6", "icon": "🔄" }
}
],
"connections": [
{
"id": "conn-1",
"sourceNodeId": "fetch-repo",
"targetNodeId": "extract-name",
"sourceOutput": "default",
"label": "On Success"
}
],
"variables": {
"owner": "scottgal",
"repo": "mostlylucidweb"
}
}
We now have a fully functional workflow engine! But it's only accessible programmatically. In Part 3, we'll build the visual editor using HTMX, Alpine.js, TailwindCSS, and DaisyUI.
We'll create:
In this post, we've built the core of our workflow system:
The beauty of this design is its simplicity. Nodes are just classes implementing an interface. Workflows are just JSON. Execution is just recursion. Yet from these simple building blocks, we can create powerful automations.
In the next post, we'll bring this to life with a stunning visual interface!
All the code from this post is available in the Mostlylucid repository:
Mostlylucid.Workflow.Shared/Models/Mostlylucid.Workflow.Engine/Mostlylucid.Shared/Entities/Stay tuned for Part 3!
© 2025 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.