a 利用HTMX和ASP.NET核心建立工作流程系统----第二部分:建筑和核心引擎 (中文 (Chinese Simplified))

a 利用HTMX和ASP.NET核心建立工作流程系统----第二部分:建筑和核心引擎

Wednesday, 15 January 2025

//

8 minute read

一. 导言 导言 导言 导言 导言 导言 一,导言 导言 导言 导言 导言 导言

第一部分 第一部分我们引入了建立定制工作流程系统的概念。

现在是时候让我们的手弄脏了!

  • 建立核心工作流程引擎 也就是我们执行基于节点的工作流程的系统的核心
  • 到这个职位结束时,你会有:
  • 坚固的项目结构
  • 核心工作流程模式(节点、连接、定义)
  • 一个工作执行引擎

数据库持久性

多个内置节点类型

Mostlylucid.Workflow.Shared/        # Shared models and DTOs
├── Models/
│   ├── WorkflowNode.cs              # Node definition
│   ├── NodeConnection.cs            # Connections between nodes
│   ├── WorkflowDefinition.cs        # Complete workflow definition
│   └── WorkflowExecution.cs         # Execution tracking

Mostlylucid.Workflow.Engine/         # Core execution engine
├── Interfaces/
│   ├── IWorkflowNode.cs             # Node interface
│   ├── IWorkflowExecutor.cs         # Executor interface
│   └── INodeRegistry.cs             # Node registry interface
├── Execution/
│   ├── NodeRegistry.cs              # Registry for node types
│   └── WorkflowExecutor.cs          # Main execution engine
└── Nodes/
    ├── BaseWorkflowNode.cs          # Base node implementation
    ├── HttpRequestNode.cs           # HTTP API calls
    ├── TransformNode.cs             # Data transformation
    └── DelayNode.cs                 # Delay execution

Mostlylucid.Shared/Entities/         # Database entities (EF Core)
├── WorkflowDefinitionEntity.cs
├── WorkflowExecutionEntity.cs
└── WorkflowTriggerStateEntity.cs

项目结构

我们已经将我们的解决方案 组织为重点项目 以维护:

核心模型核心模型WorkflowNode工作流量

public class WorkflowNode
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string Type { get; set; } = string.Empty;
    public string Name { get; set; } = string.Empty;
    public string? Description { get; set; }

    // Configuration
    public Dictionary<string, object> Inputs { get; set; } = new();
    public Dictionary<string, string> Outputs { get; set; } = new();
    public Dictionary<string, string> Conditions { get; set; } = new();

    // Visual properties
    public NodePosition Position { get; set; } = new();
    public NodeStyle Style { get; set; } = new();
}

public class NodeStyle
{
    public string BackgroundColor { get; set; } = "#3B82F6";
    public string TextColor { get; set; } = "#FFFFFF";
    public string BorderColor { get; set; } = "#2563EB";
    public string? Icon { get; set; }
    public int Width { get; set; } = 200;
    public int Height { get; set; } = 100;
}

缩略

  • **是所有工作流程的构件。**它的设计完全可以连载到JSON:
  • **关键设计决定:**作为词典输入
  • :灵活的关键价值配置视觉属性{{variable}}:节点知道如何自我塑造

模板支持

:值可用

public class NodeConnection
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string SourceNodeId { get; set; } = string.Empty;
    public string TargetNodeId { get; set; } = string.Empty;
    public string SourceOutput { get; set; } = "default";
    public string TargetInput { get; set; } = "default";
    public string? Condition { get; set; }
    public string? Label { get; set; }
}

动态数据语法

  • 结对对
  • 连接定义节点之间数据流的方式 :
  • 允许:

每个节点的多个输出( 成功、 错误、 有条件的分支)

为清晰度而命名的投入/产出

public class WorkflowDefinition
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string Name { get; set; } = string.Empty;
    public string? Description { get; set; }
    public int Version { get; set; } = 1;

    public List<WorkflowNode> Nodes { get; set; } = new();
    public List<NodeConnection> Connections { get; set; } = new();

    public string? StartNodeId { get; set; }
    public List<string> Tags { get; set; } = new();
    public bool IsEnabled { get; set; } = true;

    public Dictionary<string, object>? Variables { get; set; }
}

有条件连接 (“ 如果 X = Y 仅连接 )

工作流量WorkflowExecutor完整的工作流程:

工作流程执行器

public async Task<WorkflowExecution> ExecuteAsync(
    WorkflowDefinition workflow,
    Dictionary<string, object>? inputData = null,
    string? triggeredBy = null,
    CancellationToken cancellationToken = default)
{
    var execution = new WorkflowExecution
    {
        Id = Guid.NewGuid().ToString(),
        WorkflowId = workflow.Id,
        Status = WorkflowExecutionStatus.Running,
        StartedAt = DateTime.UtcNow,
        InputData = inputData,
        Context = new Dictionary<string, object>(inputData ?? new())
    };

    try
    {
        // Validate workflow
        var validationErrors = ValidateWorkflow(workflow);
        if (validationErrors.Any())
        {
            throw new InvalidOperationException(
                $"Workflow validation failed: {string.Join(", ", validationErrors)}");
        }

        // Create execution context
        var context = new WorkflowExecutionContext
        {
            Execution = execution,
            Workflow = workflow,
            Data = execution.Context,
            Services = _serviceProvider
        };

        // Find and execute start node
        var startNode = workflow.Nodes.FirstOrDefault(n => n.Id == workflow.StartNodeId);
        if (startNode == null)
        {
            throw new InvalidOperationException("No start node found");
        }

        await ExecuteNodeRecursiveAsync(startNode, context, cancellationToken);

        execution.Status = WorkflowExecutionStatus.Completed;
        execution.CompletedAt = DateTime.UtcNow;
        execution.OutputData = context.Data;
    }
    catch (Exception ex)
    {
        execution.Status = WorkflowExecutionStatus.Failed;
        execution.ErrorMessage = ex.Message;
        // ... error handling
    }

    return execution;
}

缩略

是我们系统的大脑。

private async Task ExecuteNodeRecursiveAsync(
    WorkflowNode nodeConfig,
    WorkflowExecutionContext context,
    CancellationToken cancellationToken)
{
    // Get node implementation from registry
    var node = _nodeRegistry.GetNode(nodeConfig.Type);
    if (node == null)
    {
        throw new InvalidOperationException($"Node type '{nodeConfig.Type}' not registered");
    }

    // Execute the node
    var result = await node.ExecuteAsync(nodeConfig, context, cancellationToken);

    // Record execution history
    context.Execution.NodeExecutions.Add(result);

    // Store outputs for downstream nodes
    if (result.OutputData != null)
    {
        context.NodeOutputs[nodeConfig.Id] = result.OutputData;

        // Merge into shared context
        foreach (var (key, value) in result.OutputData)
        {
            context.Data[key] = value;
        }
    }

    // Handle failure with error routing
    if (result.Status == NodeExecutionStatus.Failed)
    {
        var errorConnection = context.Workflow.Connections
            .FirstOrDefault(c => c.SourceNodeId == nodeConfig.Id &&
                                 c.SourceOutput == "error");

        if (errorConnection != null)
        {
            // Route to error handler
            var errorNode = context.Workflow.Nodes
                .FirstOrDefault(n => n.Id == errorConnection.TargetNodeId);
            if (errorNode != null)
            {
                await ExecuteNodeRecursiveAsync(errorNode, context, cancellationToken);
                return;
            }
        }

        throw new Exception($"Node {nodeConfig.Id} failed: {result.ErrorMessage}");
    }

    // Find and execute downstream nodes
    var outgoingConnections = context.Workflow.Connections
        .Where(c => c.SourceNodeId == nodeConfig.Id && c.SourceOutput != "error")
        .ToList();

    foreach (var connection in outgoingConnections)
    {
        // Check connection condition
        if (!string.IsNullOrEmpty(connection.Condition))
        {
            if (!EvaluateCondition(connection.Condition, context))
            {
                continue; // Skip this connection
            }
        }

        // Execute target node
        var targetNode = context.Workflow.Nodes
            .FirstOrDefault(n => n.Id == connection.TargetNodeId);
        if (targetNode != null)
        {
            await ExecuteNodeRecursiveAsync(targetNode, context, cancellationToken);
        }
    }
}

它解释工作流程的定义并执行这些定义。

  • 核心执行逻辑
  • 递归节点执行
  • 执行模式的关键是重现
  • 每个节点执行,然后触发下游节点:

为什么递归?

简单易理解和实施NodeRegistry自然执行流量

public class NodeRegistry : INodeRegistry
{
    private readonly Dictionary<string, Type> _nodeTypes = new();
    private readonly IServiceProvider _serviceProvider;

    public void RegisterNode<TNode>(string nodeType) where TNode : IWorkflowNode
    {
        _nodeTypes[nodeType] = typeof(TNode);
    }

    public IWorkflowNode? GetNode(string nodeType)
    {
        if (!_nodeTypes.TryGetValue(nodeType, out var type))
        {
            return null;
        }

        // Try DI first, fallback to Activator
        return _serviceProvider.GetService(type) as IWorkflowNode
               ?? Activator.CreateInstance(type) as IWorkflowNode;
    }
}

以后容易添加平行执行

  • 处理任意图形结构
  • 节点登记簿
  • 缩略

允许节点类型的动态注册 :

本设计允许:

简单添加自定义节点

public class HttpRequestNode : BaseWorkflowNode
{
    private readonly IHttpClientFactory _httpClientFactory;

    public override string NodeType => "HttpRequest";

    public override async Task<NodeExecutionResult> ExecuteAsync(
        WorkflowNode nodeConfig,
        WorkflowExecutionContext context,
        CancellationToken cancellationToken = default)
    {
        var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);

        var url = resolvedInputs.GetValueOrDefault("url")?.ToString();
        var method = resolvedInputs.GetValueOrDefault("method")?.ToString() ?? "GET";
        var headers = resolvedInputs.GetValueOrDefault("headers") as Dictionary<string, object>;
        var body = resolvedInputs.GetValueOrDefault("body");

        var client = _httpClientFactory.CreateClient();

        // Add headers
        if (headers != null)
        {
            foreach (var (key, value) in headers)
            {
                client.DefaultRequestHeaders.TryAddWithoutValidation(
                    key, value?.ToString() ?? string.Empty);
            }
        }

        // Make request
        HttpResponseMessage response = method.ToUpper() switch
        {
            "GET" => await client.GetAsync(url, cancellationToken),
            "POST" => await client.PostAsJsonAsync(url, body, cancellationToken),
            "PUT" => await client.PutAsJsonAsync(url, body, cancellationToken),
            "DELETE" => await client.DeleteAsync(url, cancellationToken),
            _ => throw new InvalidOperationException($"Unsupported method: {method}")
        };

        var responseBody = await response.Content.ReadAsStringAsync(cancellationToken);

        var outputData = new Dictionary<string, object>
        {
            ["statusCode"] = (int)response.StatusCode,
            ["body"] = JsonSerializer.Deserialize<Dictionary<string, object>>(responseBody)
                       ?? responseBody,
            ["isSuccess"] = response.IsSuccessStatusCode
        };

        return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
    }
}

依赖注射支持

{
  "type": "HttpRequest",
  "inputs": {
    "url": "https://api.github.com/repos/{{owner}}/{{repo}}",
    "method": "GET",
    "headers": {
      "Authorization": "Bearer {{apiToken}}",
      "Accept": "application/vnd.github+json"
    }
  },
  "outputs": {
    "repoData": "{{body}}",
    "statusCode": "{{statusCode}}"
  }
}

运行时节点发现

内建节点

public class TransformNode : BaseWorkflowNode
{
    public override string NodeType => "Transform";

    public override async Task<NodeExecutionResult> ExecuteAsync(
        WorkflowNode nodeConfig,
        WorkflowExecutionContext context,
        CancellationToken cancellationToken = default)
    {
        var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
        var operation = resolvedInputs.GetValueOrDefault("operation")?.ToString();
        var inputData = resolvedInputs.GetValueOrDefault("data");

        object result = operation?.ToLower() switch
        {
            "uppercase" => inputData?.ToString()?.ToUpper() ?? string.Empty,
            "lowercase" => inputData?.ToString()?.ToLower() ?? string.Empty,
            "trim" => inputData?.ToString()?.Trim() ?? string.Empty,
            "length" => inputData?.ToString()?.Length ?? 0,
            "json_parse" => JsonSerializer.Deserialize<Dictionary<string, object>>(
                inputData?.ToString() ?? "{}"),
            "json_stringify" => JsonSerializer.Serialize(inputData),
            _ => inputData ?? string.Empty
        };

        var outputData = new Dictionary<string, object>
        {
            ["result"] = result
        };

        return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
    }
}

Http 请求节点

使 HTTP API 电话配置完整 :

public class DelayNode : BaseWorkflowNode
{
    public override string NodeType => "Delay";

    public override async Task<NodeExecutionResult> ExecuteAsync(
        WorkflowNode nodeConfig,
        WorkflowExecutionContext context,
        CancellationToken cancellationToken = default)
    {
        var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);

        var durationMs = int.Parse(
            resolvedInputs.GetValueOrDefault("durationMs")?.ToString() ?? "0");

        await Task.Delay(durationMs, cancellationToken);

        var outputData = new Dictionary<string, object>
        {
            ["delayedMs"] = durationMs,
            ["completedAt"] = DateTime.UtcNow.ToString("O")
        };

        return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
    }
}

示例使用率:

变换节点{{variable}}简单数据转换:BaseWorkflowNode延迟节点

protected string ResolveTemplate(string template, WorkflowExecutionContext context)
{
    if (string.IsNullOrEmpty(template)) return template;

    var result = template;
    var matches = Regex.Matches(template, @"\{\{([^}]+)\}\}");

    foreach (Match match in matches)
    {
        var variable = match.Groups[1].Value.Trim();
        if (context.Data.TryGetValue(variable, out var value))
        {
            result = result.Replace(match.Value, value?.ToString() ?? string.Empty);
        }
    }

    return result;
}

将延迟添加到工作流程中 :

{
  "type": "HttpRequest",
  "inputs": {
    "url": "{{apiBaseUrl}}/users/{{userId}}/posts",
    "headers": {
      "Authorization": "Bearer {{authToken}}"
    }
  }
}

模板模板系统

使用节点支持模板变量

[Table("workflow_definitions")]
public class WorkflowDefinitionEntity
{
    [Key]
    public int Id { get; set; }

    [Required]
    [MaxLength(100)]
    public string WorkflowId { get; set; } = string.Empty;

    [Required]
    [MaxLength(200)]
    public string Name { get; set; } = string.Empty;

    [Column(TypeName = "jsonb")]
    public string DefinitionJson { get; set; } = string.Empty;

    public bool IsEnabled { get; set; } = true;
    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
    public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;

    public ICollection<WorkflowExecutionEntity> Executions { get; set; } = new List<WorkflowExecutionEntity>();
}

语法。

  • 缩略
  • 提供助手方法 :
  • 这样就可以有强大的动态工作流程:

数据库持久性

我们使用实体框架核心与 PostgreSQL 用于持久性:

{
  "id": "github-repo-workflow",
  "name": "GitHub Repository Info Fetcher",
  "startNodeId": "fetch-repo",
  "nodes": [
    {
      "id": "fetch-repo",
      "type": "HttpRequest",
      "name": "Fetch Repository",
      "inputs": {
        "url": "https://api.github.com/repos/{{owner}}/{{repo}}",
        "method": "GET",
        "headers": {
          "Accept": "application/vnd.github+json"
        }
      },
      "outputs": {
        "repoData": "{{body}}"
      },
      "position": { "x": 100, "y": 100 },
      "style": { "backgroundColor": "#10B981", "icon": "🔍" }
    },
    {
      "id": "extract-name",
      "type": "Transform",
      "name": "Extract Repo Name",
      "inputs": {
        "operation": "json_stringify",
        "data": "{{repoData}}"
      },
      "position": { "x": 100, "y": 250 },
      "style": { "backgroundColor": "#3B82F6", "icon": "🔄" }
    }
  ],
  "connections": [
    {
      "id": "conn-1",
      "sourceNodeId": "fetch-repo",
      "targetNodeId": "extract-name",
      "sourceOutput": "default",
      "label": "On Success"
    }
  ],
  "variables": {
    "owner": "scottgal",
    "repo": "mostlylucidweb"
  }
}

为什么是JSONB?

灵活:不移徙,工作流程的定义可以演变快速: PostgreSQL 的 JSONB 索引和可查询简单: 不需要复杂的关系映射

工作流量实例

  • 以下是一个完整的工作流程, 获取 GitHub repo 数据并转换数据 :
  • 下一个是什么?
  • 我们现在有一个功能齐全的工作流程引擎!
  • 但它只是程序上无障碍的。

第三部分 第三部分

我们会用HTMX、Alpine.js、TackwindCSS 和DaisyUI 来建立视觉编辑。

  • 我们将创造:
  • 用于节点的拖放画布
  • 视觉连接绘图( 思考“ 哑节点- RED ” )
  • 节点配置面板
  • 工作流程执行监测

一个美丽的、主题可调和的 UI

结论 结论 结论 结论 结论

我们建立了我们工作流程系统的核心:

灵活节点型建筑

  • 递递递执行引擎Mostlylucid.Workflow.Shared/Models/
  • 模板可变系统Mostlylucid.Workflow.Engine/
  • 多个内嵌节点类型Mostlylucid.Shared/Entities/

数据库持久性

Finding related posts...
logo

© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.