This is a viewer only at the moment see the article on how this works.
To update the preview hit Ctrl-Alt-R (or ⌘-Alt-R on Mac) or Enter to refresh. The Save icon lets you save the markdown file to disk
This is a preview from the server running through my markdig pipeline
Wednesday, 15 January 2025
内第一部分 第一部分我们引入了建立定制工作流程系统的概念。
现在是时候让我们的手弄脏了!
多个内置节点类型
Mostlylucid.Workflow.Shared/ # Shared models and DTOs
├── Models/
│ ├── WorkflowNode.cs # Node definition
│ ├── NodeConnection.cs # Connections between nodes
│ ├── WorkflowDefinition.cs # Complete workflow definition
│ └── WorkflowExecution.cs # Execution tracking
Mostlylucid.Workflow.Engine/ # Core execution engine
├── Interfaces/
│ ├── IWorkflowNode.cs # Node interface
│ ├── IWorkflowExecutor.cs # Executor interface
│ └── INodeRegistry.cs # Node registry interface
├── Execution/
│ ├── NodeRegistry.cs # Registry for node types
│ └── WorkflowExecutor.cs # Main execution engine
└── Nodes/
├── BaseWorkflowNode.cs # Base node implementation
├── HttpRequestNode.cs # HTTP API calls
├── TransformNode.cs # Data transformation
└── DelayNode.cs # Delay execution
Mostlylucid.Shared/Entities/ # Database entities (EF Core)
├── WorkflowDefinitionEntity.cs
├── WorkflowExecutionEntity.cs
└── WorkflowTriggerStateEntity.cs
核心模型核心模型WorkflowNode工作流量
public class WorkflowNode
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Type { get; set; } = string.Empty;
public string Name { get; set; } = string.Empty;
public string? Description { get; set; }
// Configuration
public Dictionary<string, object> Inputs { get; set; } = new();
public Dictionary<string, string> Outputs { get; set; } = new();
public Dictionary<string, string> Conditions { get; set; } = new();
// Visual properties
public NodePosition Position { get; set; } = new();
public NodeStyle Style { get; set; } = new();
}
public class NodeStyle
{
public string BackgroundColor { get; set; } = "#3B82F6";
public string TextColor { get; set; } = "#FFFFFF";
public string BorderColor { get; set; } = "#2563EB";
public string? Icon { get; set; }
public int Width { get; set; } = 200;
public int Height { get; set; } = 100;
}
缩略
{{variable}}:节点知道如何自我塑造:值可用
public class NodeConnection
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string SourceNodeId { get; set; } = string.Empty;
public string TargetNodeId { get; set; } = string.Empty;
public string SourceOutput { get; set; } = "default";
public string TargetInput { get; set; } = "default";
public string? Condition { get; set; }
public string? Label { get; set; }
}
动态数据语法
为清晰度而命名的投入/产出
public class WorkflowDefinition
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Name { get; set; } = string.Empty;
public string? Description { get; set; }
public int Version { get; set; } = 1;
public List<WorkflowNode> Nodes { get; set; } = new();
public List<NodeConnection> Connections { get; set; } = new();
public string? StartNodeId { get; set; }
public List<string> Tags { get; set; } = new();
public bool IsEnabled { get; set; } = true;
public Dictionary<string, object>? Variables { get; set; }
}
工作流量WorkflowExecutor完整的工作流程:
public async Task<WorkflowExecution> ExecuteAsync(
WorkflowDefinition workflow,
Dictionary<string, object>? inputData = null,
string? triggeredBy = null,
CancellationToken cancellationToken = default)
{
var execution = new WorkflowExecution
{
Id = Guid.NewGuid().ToString(),
WorkflowId = workflow.Id,
Status = WorkflowExecutionStatus.Running,
StartedAt = DateTime.UtcNow,
InputData = inputData,
Context = new Dictionary<string, object>(inputData ?? new())
};
try
{
// Validate workflow
var validationErrors = ValidateWorkflow(workflow);
if (validationErrors.Any())
{
throw new InvalidOperationException(
$"Workflow validation failed: {string.Join(", ", validationErrors)}");
}
// Create execution context
var context = new WorkflowExecutionContext
{
Execution = execution,
Workflow = workflow,
Data = execution.Context,
Services = _serviceProvider
};
// Find and execute start node
var startNode = workflow.Nodes.FirstOrDefault(n => n.Id == workflow.StartNodeId);
if (startNode == null)
{
throw new InvalidOperationException("No start node found");
}
await ExecuteNodeRecursiveAsync(startNode, context, cancellationToken);
execution.Status = WorkflowExecutionStatus.Completed;
execution.CompletedAt = DateTime.UtcNow;
execution.OutputData = context.Data;
}
catch (Exception ex)
{
execution.Status = WorkflowExecutionStatus.Failed;
execution.ErrorMessage = ex.Message;
// ... error handling
}
return execution;
}
是我们系统的大脑。
private async Task ExecuteNodeRecursiveAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken)
{
// Get node implementation from registry
var node = _nodeRegistry.GetNode(nodeConfig.Type);
if (node == null)
{
throw new InvalidOperationException($"Node type '{nodeConfig.Type}' not registered");
}
// Execute the node
var result = await node.ExecuteAsync(nodeConfig, context, cancellationToken);
// Record execution history
context.Execution.NodeExecutions.Add(result);
// Store outputs for downstream nodes
if (result.OutputData != null)
{
context.NodeOutputs[nodeConfig.Id] = result.OutputData;
// Merge into shared context
foreach (var (key, value) in result.OutputData)
{
context.Data[key] = value;
}
}
// Handle failure with error routing
if (result.Status == NodeExecutionStatus.Failed)
{
var errorConnection = context.Workflow.Connections
.FirstOrDefault(c => c.SourceNodeId == nodeConfig.Id &&
c.SourceOutput == "error");
if (errorConnection != null)
{
// Route to error handler
var errorNode = context.Workflow.Nodes
.FirstOrDefault(n => n.Id == errorConnection.TargetNodeId);
if (errorNode != null)
{
await ExecuteNodeRecursiveAsync(errorNode, context, cancellationToken);
return;
}
}
throw new Exception($"Node {nodeConfig.Id} failed: {result.ErrorMessage}");
}
// Find and execute downstream nodes
var outgoingConnections = context.Workflow.Connections
.Where(c => c.SourceNodeId == nodeConfig.Id && c.SourceOutput != "error")
.ToList();
foreach (var connection in outgoingConnections)
{
// Check connection condition
if (!string.IsNullOrEmpty(connection.Condition))
{
if (!EvaluateCondition(connection.Condition, context))
{
continue; // Skip this connection
}
}
// Execute target node
var targetNode = context.Workflow.Nodes
.FirstOrDefault(n => n.Id == connection.TargetNodeId);
if (targetNode != null)
{
await ExecuteNodeRecursiveAsync(targetNode, context, cancellationToken);
}
}
}
它解释工作流程的定义并执行这些定义。
简单易理解和实施NodeRegistry自然执行流量
public class NodeRegistry : INodeRegistry
{
private readonly Dictionary<string, Type> _nodeTypes = new();
private readonly IServiceProvider _serviceProvider;
public void RegisterNode<TNode>(string nodeType) where TNode : IWorkflowNode
{
_nodeTypes[nodeType] = typeof(TNode);
}
public IWorkflowNode? GetNode(string nodeType)
{
if (!_nodeTypes.TryGetValue(nodeType, out var type))
{
return null;
}
// Try DI first, fallback to Activator
return _serviceProvider.GetService(type) as IWorkflowNode
?? Activator.CreateInstance(type) as IWorkflowNode;
}
}
以后容易添加平行执行
简单添加自定义节点
public class HttpRequestNode : BaseWorkflowNode
{
private readonly IHttpClientFactory _httpClientFactory;
public override string NodeType => "HttpRequest";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var url = resolvedInputs.GetValueOrDefault("url")?.ToString();
var method = resolvedInputs.GetValueOrDefault("method")?.ToString() ?? "GET";
var headers = resolvedInputs.GetValueOrDefault("headers") as Dictionary<string, object>;
var body = resolvedInputs.GetValueOrDefault("body");
var client = _httpClientFactory.CreateClient();
// Add headers
if (headers != null)
{
foreach (var (key, value) in headers)
{
client.DefaultRequestHeaders.TryAddWithoutValidation(
key, value?.ToString() ?? string.Empty);
}
}
// Make request
HttpResponseMessage response = method.ToUpper() switch
{
"GET" => await client.GetAsync(url, cancellationToken),
"POST" => await client.PostAsJsonAsync(url, body, cancellationToken),
"PUT" => await client.PutAsJsonAsync(url, body, cancellationToken),
"DELETE" => await client.DeleteAsync(url, cancellationToken),
_ => throw new InvalidOperationException($"Unsupported method: {method}")
};
var responseBody = await response.Content.ReadAsStringAsync(cancellationToken);
var outputData = new Dictionary<string, object>
{
["statusCode"] = (int)response.StatusCode,
["body"] = JsonSerializer.Deserialize<Dictionary<string, object>>(responseBody)
?? responseBody,
["isSuccess"] = response.IsSuccessStatusCode
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
依赖注射支持
{
"type": "HttpRequest",
"inputs": {
"url": "https://api.github.com/repos/{{owner}}/{{repo}}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{apiToken}}",
"Accept": "application/vnd.github+json"
}
},
"outputs": {
"repoData": "{{body}}",
"statusCode": "{{statusCode}}"
}
}
内建节点
public class TransformNode : BaseWorkflowNode
{
public override string NodeType => "Transform";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var operation = resolvedInputs.GetValueOrDefault("operation")?.ToString();
var inputData = resolvedInputs.GetValueOrDefault("data");
object result = operation?.ToLower() switch
{
"uppercase" => inputData?.ToString()?.ToUpper() ?? string.Empty,
"lowercase" => inputData?.ToString()?.ToLower() ?? string.Empty,
"trim" => inputData?.ToString()?.Trim() ?? string.Empty,
"length" => inputData?.ToString()?.Length ?? 0,
"json_parse" => JsonSerializer.Deserialize<Dictionary<string, object>>(
inputData?.ToString() ?? "{}"),
"json_stringify" => JsonSerializer.Serialize(inputData),
_ => inputData ?? string.Empty
};
var outputData = new Dictionary<string, object>
{
["result"] = result
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
使 HTTP API 电话配置完整 :
public class DelayNode : BaseWorkflowNode
{
public override string NodeType => "Delay";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var durationMs = int.Parse(
resolvedInputs.GetValueOrDefault("durationMs")?.ToString() ?? "0");
await Task.Delay(durationMs, cancellationToken);
var outputData = new Dictionary<string, object>
{
["delayedMs"] = durationMs,
["completedAt"] = DateTime.UtcNow.ToString("O")
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
变换节点{{variable}}简单数据转换:BaseWorkflowNode延迟节点
protected string ResolveTemplate(string template, WorkflowExecutionContext context)
{
if (string.IsNullOrEmpty(template)) return template;
var result = template;
var matches = Regex.Matches(template, @"\{\{([^}]+)\}\}");
foreach (Match match in matches)
{
var variable = match.Groups[1].Value.Trim();
if (context.Data.TryGetValue(variable, out var value))
{
result = result.Replace(match.Value, value?.ToString() ?? string.Empty);
}
}
return result;
}
将延迟添加到工作流程中 :
{
"type": "HttpRequest",
"inputs": {
"url": "{{apiBaseUrl}}/users/{{userId}}/posts",
"headers": {
"Authorization": "Bearer {{authToken}}"
}
}
}
使用节点支持模板变量
[Table("workflow_definitions")]
public class WorkflowDefinitionEntity
{
[Key]
public int Id { get; set; }
[Required]
[MaxLength(100)]
public string WorkflowId { get; set; } = string.Empty;
[Required]
[MaxLength(200)]
public string Name { get; set; } = string.Empty;
[Column(TypeName = "jsonb")]
public string DefinitionJson { get; set; } = string.Empty;
public bool IsEnabled { get; set; } = true;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
public ICollection<WorkflowExecutionEntity> Executions { get; set; } = new List<WorkflowExecutionEntity>();
}
语法。
我们使用实体框架核心与 PostgreSQL 用于持久性:
{
"id": "github-repo-workflow",
"name": "GitHub Repository Info Fetcher",
"startNodeId": "fetch-repo",
"nodes": [
{
"id": "fetch-repo",
"type": "HttpRequest",
"name": "Fetch Repository",
"inputs": {
"url": "https://api.github.com/repos/{{owner}}/{{repo}}",
"method": "GET",
"headers": {
"Accept": "application/vnd.github+json"
}
},
"outputs": {
"repoData": "{{body}}"
},
"position": { "x": 100, "y": 100 },
"style": { "backgroundColor": "#10B981", "icon": "🔍" }
},
{
"id": "extract-name",
"type": "Transform",
"name": "Extract Repo Name",
"inputs": {
"operation": "json_stringify",
"data": "{{repoData}}"
},
"position": { "x": 100, "y": 250 },
"style": { "backgroundColor": "#3B82F6", "icon": "🔄" }
}
],
"connections": [
{
"id": "conn-1",
"sourceNodeId": "fetch-repo",
"targetNodeId": "extract-name",
"sourceOutput": "default",
"label": "On Success"
}
],
"variables": {
"owner": "scottgal",
"repo": "mostlylucidweb"
}
}
灵活:不移徙,工作流程的定义可以演变快速: PostgreSQL 的 JSONB 索引和可查询简单: 不需要复杂的关系映射
工作流量实例
我们会用HTMX、Alpine.js、TackwindCSS 和DaisyUI 来建立视觉编辑。
一个美丽的、主题可调和的 UI
结论 结论 结论 结论 结论
灵活节点型建筑
Mostlylucid.Workflow.Shared/Models/Mostlylucid.Workflow.Engine/Mostlylucid.Shared/Entities/数据库持久性
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.