a 利用HTMX和ASP.NET核心建立工作流程系统----第4部分:营火整合与自动化 (中文 (Chinese Simplified))

a 利用HTMX和ASP.NET核心建立工作流程系统----第4部分:营火整合与自动化

Wednesday, 15 January 2025

//

9 minute read

一. 导言 导言 导言 导言 导言 导言 一,导言 导言 导言 导言 导言 导言

第三部分 第三部分我们建造了一个美丽的视觉编辑

  • **但我们的工作流程只有在我们手动触发时才能运行。**在最后这篇文章中, 我们会让工作流程真正自主,
  • 计划执行- 按照时间表运行工作流程
  • AIP 投票投票- 监测外部广告信息并触发变化
  • 国家管理- 跨处决的轨道触发状态

仪表板

  • 监测所有背景工作

  • 为什么要着火?

  • 绞刑是适合我们需要的 因为它:

  • 我们现有PostgreSQL数据库的仓库工作

  • 提供内置仪式仪表板

  • 支持经常性工作

具有自动重试逻辑

水平水平缩平

[Table("workflow_trigger_states")]
public class WorkflowTriggerStateEntity
{
    public int Id { get; set; }
    public int WorkflowDefinitionId { get; set; }

    // Type: "Schedule", "ApiPoll", "Webhook"
    public string TriggerType { get; set; } = string.Empty;

    // Configuration as JSON
    public string ConfigJson { get; set; } = "{}";

    // Current state as JSON (stores last poll time, content hash, etc.)
    public string StateJson { get; set; } = "{}";

    public bool IsEnabled { get; set; } = true;
    public DateTime? LastCheckedAt { get; set; }
    public DateTime? LastFiredAt { get; set; }
    public int FireCount { get; set; } = 0;
    public string? LastError { get; set; }
}

触发国模式

  • 首先,让我们理解一下我们的触发国实体(我们已经在第二部分中创建了这个实体):
  • 这个实体跟踪关于工作流程触发点的一切:
  • 当它最后一次运行时
  • 其配置是什么

它的状态( 用于有目的的触发)

发生的任何错误

public class ScheduleTriggerConfig
{
    public string IntervalType { get; set; } = "minutes"; // minutes, hours, days
    public int IntervalValue { get; set; } = 60;
    public Dictionary<string, object>? InputData { get; set; }
}

工 工 工 流量

public class WorkflowSchedulerJob
{
    private readonly MostlylucidDbContext _context;
    private readonly WorkflowExecutionService _executionService;
    private readonly ILogger<WorkflowSchedulerJob> _logger;

    [AutomaticRetry(Attempts = 3)]
    public async Task ExecuteScheduledWorkflowsAsync()
    {
        _logger.LogInformation("Checking for scheduled workflows");

        // Get all enabled schedule triggers
        var triggers = await _context.WorkflowTriggerStates
            .Include(t => t.WorkflowDefinition)
            .Where(t => t.IsEnabled && t.TriggerType == "Schedule")
            .ToListAsync();

        foreach (var trigger in triggers)
        {
            try
            {
                var config = JsonSerializer.Deserialize<ScheduleTriggerConfig>(
                    trigger.ConfigJson);

                if (config == null) continue;

                // Check if it's time to run
                if (!ShouldRunScheduledWorkflow(trigger, config))
                    continue;

                _logger.LogInformation(
                    "Executing scheduled workflow {WorkflowId}",
                    trigger.WorkflowDefinition.WorkflowId);

                // Execute the workflow
                await _executionService.ExecuteWorkflowAsync(
                    trigger.WorkflowDefinition.WorkflowId,
                    config.InputData,
                    "Scheduler");

                // Update trigger state
                trigger.LastCheckedAt = DateTime.UtcNow;
                trigger.LastFiredAt = DateTime.UtcNow;
                trigger.FireCount++;

                var state = JsonSerializer.Deserialize<Dictionary<string, object>>(
                                trigger.StateJson) ?? new();
                state["lastRun"] = DateTime.UtcNow.ToString("O");
                trigger.StateJson = JsonSerializer.Serialize(state);

                await _context.SaveChangesAsync();
            }
            catch (Exception ex)
            {
                _logger.LogError(ex,
                    "Error executing scheduled workflow {TriggerId}",
                    trigger.Id);
                trigger.LastError = ex.Message;
                await _context.SaveChangesAsync();
            }
        }
    }

    private bool ShouldRunScheduledWorkflow(
        WorkflowTriggerStateEntity trigger,
        ScheduleTriggerConfig config)
    {
        // First run?
        if (!trigger.LastFiredAt.HasValue)
            return true;

        var timeSinceLastRun = DateTime.UtcNow - trigger.LastFiredAt.Value;

        return config.IntervalType.ToLower() switch
        {
            "minutes" => timeSinceLastRun.TotalMinutes >= config.IntervalValue,
            "hours" => timeSinceLastRun.TotalHours >= config.IntervalValue,
            "days" => timeSinceLastRun.TotalDays >= config.IntervalValue,
            _ => false
        };
    }
}

配置模型

  1. 调度工作ExecuteScheduledWorkflowsAsync()
  2. 如何运作:
  3. 每分每分每秒, 燃火呼叫
  4. 我们查询已启用的时间表触发器
  5. 每个触发器, 检查是否已经过了足够时间

如果是,执行工作流程

以上次运行时间更新触发状态

APP 投票

public class ApiPollTriggerConfig
{
    public string Url { get; set; } = string.Empty;
    public int IntervalSeconds { get; set; } = 300; // 5 minutes
    public bool AlwaysTrigger { get; set; } = false;
    public Dictionary<string, string>? Headers { get; set; }
}

IPI民调更有趣, 我们监测外部API, 当内容改变时会触发工作流程!

[AutomaticRetry(Attempts = 3)]
public async Task PollApiTriggersAsync()
{
    _logger.LogInformation("Polling API triggers");

    var triggers = await _context.WorkflowTriggerStates
        .Include(t => t.WorkflowDefinition)
        .Where(t => t.IsEnabled && t.TriggerType == "ApiPoll")
        .ToListAsync();

    foreach (var trigger in triggers)
    {
        try
        {
            var config = JsonSerializer.Deserialize<ApiPollTriggerConfig>(
                trigger.ConfigJson);

            if (config == null) continue;

            // Check if it's time to poll
            if (trigger.LastCheckedAt.HasValue)
            {
                var timeSinceLastCheck = DateTime.UtcNow - trigger.LastCheckedAt.Value;
                if (timeSinceLastCheck.TotalSeconds < config.IntervalSeconds)
                    continue;
            }

            _logger.LogInformation("Polling API for workflow {WorkflowId}",
                trigger.WorkflowDefinition.WorkflowId);

            // Poll the API
            using var httpClient = new HttpClient();
            var response = await httpClient.GetAsync(config.Url);
            var content = await response.Content.ReadAsStringAsync();

            // Get previous state
            var state = JsonSerializer.Deserialize<Dictionary<string, object>>(
                            trigger.StateJson) ?? new();

            var previousHash = state.GetValueOrDefault("contentHash")?.ToString();
            var currentHash = ComputeHash(content);

            // Has content changed?
            if (previousHash != currentHash || config.AlwaysTrigger)
            {
                _logger.LogInformation(
                    "API content changed, triggering workflow {WorkflowId}",
                    trigger.WorkflowDefinition.WorkflowId);

                // Pass response as input to workflow
                var inputData = new Dictionary<string, object>
                {
                    ["apiResponse"] = content,
                    ["statusCode"] = (int)response.StatusCode,
                    ["previousHash"] = previousHash ?? string.Empty,
                    ["currentHash"] = currentHash
                };

                // Execute the workflow
                await _executionService.ExecuteWorkflowAsync(
                    trigger.WorkflowDefinition.WorkflowId,
                    inputData,
                    $"ApiPoll:{config.Url}");

                trigger.LastFiredAt = DateTime.UtcNow;
                trigger.FireCount++;

                // Update state
                state["contentHash"] = currentHash;
                state["lastContent"] = content.Length > 1000
                    ? content.Substring(0, 1000)
                    : content;
                state["lastPoll"] = DateTime.UtcNow.ToString("O");
            }

            trigger.LastCheckedAt = DateTime.UtcNow;
            trigger.StateJson = JsonSerializer.Serialize(state);
            trigger.LastError = null;

            await _context.SaveChangesAsync();
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error polling API trigger {TriggerId}",
                trigger.Id);
            trigger.LastError = ex.Message;
            trigger.LastCheckedAt = DateTime.UtcNow;
            await _context.SaveChangesAsync();
        }
    }
}

private string ComputeHash(string content)
{
    using var sha256 = System.Security.Cryptography.SHA256.Create();
    var bytes = System.Text.Encoding.UTF8.GetBytes(content);
    var hash = sha256.ComputeHash(bytes);
    return Convert.ToBase64String(hash);
}

配置模型

  1. 投票工作
  2. 如何运作:
  3. 每分钟检查所有API投票的触发器
  4. 每个触发器,检查自上次投票以来是否已经过了足够时间
  5. 投放配置的 URL
  6. 计算回复内容的散列AlwaysTrigger与以前储存在州内的散列相比
  7. 更改(或)
  8. 执行工作流程

将 API 回复作为输入数据传送到工作流程

以新的散列更新状态

{
  "triggerType": "ApiPoll",
  "config": {
    "url": "https://api.github.com/repos/dotnet/aspnetcore/releases/latest",
    "intervalSeconds": 3600,
    "alwaysTrigger": false
  }
}

使用案例示例

监视 GitHub 释放 :

这是每小时一次的GitHub API民调。Program.cs当发布新版本时, 内容会更改散列, 而工作流程会随发布数据执行 !

// Add Hangfire services
builder.Services.AddHangfire(config =>
{
    config.UsePostgreSqlStorage(
        builder.Configuration.GetConnectionString("DefaultConnection"));
});

builder.Services.AddHangfireServer();

// Register our job
builder.Services.AddScoped<WorkflowSchedulerJob>();

挂牌点火工作

app.UseHangfireDashboard("/hangfire");

// Register recurring jobs
RecurringJob.AddOrUpdate<WorkflowSchedulerJob>(
    "scheduled-workflows",
    job => job.ExecuteScheduledWorkflowsAsync(),
    Cron.Minutely);

RecurringJob.AddOrUpdate<WorkflowSchedulerJob>(
    "api-poll-triggers",
    job => job.PollApiTriggersAsync(),
    Cron.Minutely);

在你的

启动配置 :/hangfire:

  • **然后,在程序启动后, 登记重复性工作:**死火套板
  • 绞火包括一个内置仪内仪表仪表仪表板,可在工作职位
  • : 查看所有排队、处理和完成的工作经常性工作
  • :管理我们的工作流程调度器重试

: 查看和重试失败的任务

app.UseHangfireDashboard("/hangfire", new DashboardOptions
{
    Authorization = new[]
    {
        new HangfireAuthorizationFilter()
    }
});

public class HangfireAuthorizationFilter : IDashboardAuthorizationFilter
{
    public bool Authorize(DashboardContext context)
    {
        var httpContext = context.GetHttpContext();

        // Only allow authenticated users
        return httpContext.User.Identity?.IsAuthenticated == true;
    }
}

服务器服务器

监视燃火服务器

[HttpPost("workflow/{id}/triggers")]
public async Task<IActionResult> CreateTrigger(
    string id,
    [FromBody] TriggerCreateRequest request)
{
    var workflow = await _context.WorkflowDefinitions
        .FirstOrDefaultAsync(w => w.WorkflowId == id);

    if (workflow == null)
        return NotFound();

    var trigger = new WorkflowTriggerStateEntity
    {
        WorkflowDefinitionId = workflow.Id,
        TriggerType = request.Type,
        ConfigJson = JsonSerializer.Serialize(request.Config),
        StateJson = "{}",
        IsEnabled = true
    };

    await _context.WorkflowTriggerStates.AddAsync(trigger);
    await _context.SaveChangesAsync();

    return Json(new { success = true, triggerId = trigger.Id });
}

public class TriggerCreateRequest
{
    public string Type { get; set; } = string.Empty; // Schedule, ApiPoll
    public object Config { get; set; } = new();
}

保护仪表板的安全

<div class="card bg-base-100 shadow-xl">
    <div class="card-body">
        <h2 class="card-title">⏰ Add Trigger</h2>

        <div class="form-control">
            <label class="label">Trigger Type</label>
            <select class="select select-bordered" x-model="triggerType">
                <option value="Schedule">Schedule</option>
                <option value="ApiPoll">API Poll</option>
            </select>
        </div>

        <!-- Schedule Config -->
        <template x-if="triggerType === 'Schedule'">
            <div class="space-y-4">
                <div class="form-control">
                    <label class="label">Interval</label>
                    <div class="flex gap-2">
                        <input type="number"
                               x-model="scheduleConfig.intervalValue"
                               class="input input-bordered flex-1" />
                        <select x-model="scheduleConfig.intervalType"
                                class="select select-bordered">
                            <option value="minutes">Minutes</option>
                            <option value="hours">Hours</option>
                            <option value="days">Days</option>
                        </select>
                    </div>
                </div>
            </div>
        </template>

        <!-- API Poll Config -->
        <template x-if="triggerType === 'ApiPoll'">
            <div class="space-y-4">
                <div class="form-control">
                    <label class="label">API URL</label>
                    <input type="url"
                           x-model="apiConfig.url"
                           class="input input-bordered"
                           placeholder="https://api.example.com/data" />
                </div>

                <div class="form-control">
                    <label class="label">Poll Interval (seconds)</label>
                    <input type="number"
                           x-model="apiConfig.intervalSeconds"
                           class="input input-bordered"
                           value="300" />
                </div>
            </div>
        </template>

        <button @click="createTrigger()" class="btn btn-primary mt-4">
            Create Trigger
        </button>
    </div>
</div>

通过 UI 管理触发器

让我们添加用于创建和管理触发器的UI:

  1. UI UI 构成部分构成部分
  2. 实际世界实例
  3. 让我们建立一个完整的自动化工作流程,以便:
  4. 将GitHub API投放新释放

检查版本是否比我们所看到的更新

{
  "name": "GitHub Release Monitor",
  "startNodeId": "parse-data",
  "nodes": [
    {
      "id": "parse-data",
      "type": "Transform",
      "name": "Extract Version",
      "inputs": {
        "operation": "json_parse",
        "data": "{{apiResponse}}"
      }
    },
    {
      "id": "log-release",
      "type": "Log",
      "name": "Log New Release",
      "inputs": {
        "message": "New release: {{tag_name}} - {{name}}",
        "level": "info"
      }
    }
  ],
  "connections": [
    {
      "sourceNodeId": "parse-data",
      "targetNodeId": "log-release"
    }
  ]
}

日志消息

{
  "type": "ApiPoll",
  "config": {
    "url": "https://api.github.com/repos/dotnet/aspnetcore/releases/latest",
    "intervalSeconds": 3600
  }
}

(可以发送电子邮件、邮寄至Slack等)

  1. 第1步:创建工作流程
  2. 步骤2:创建 API 标票触发器
  3. 现在,每小时, 营火将:
  4. 将GitHub API扑灭

将内容( hash) 与前一次民调比较

如果更改, 执行工作流程

工作流程解析 JSON 并记录发布信息

_logger.LogInformation(
    "Workflow {WorkflowId} execution {ExecutionId} completed in {Duration}ms with status {Status}",
    execution.WorkflowId,
    execution.Id,
    execution.DurationMs,
    execution.Status);

监测和观察

伐木

private static readonly Counter WorkflowExecutions = Metrics
    .CreateCounter("workflow_executions_total",
        "Total workflow executions",
        new CounterConfiguration
        {
            LabelNames = new[] { "workflow_id", "status" }
        });

// In execution service
WorkflowExecutions
    .WithLabels(workflow.Id, execution.Status.ToString())
    .Inc();

所有工作流程执行都记录如下:

计量数

  • 我们可以加上普罗米修斯的衡量标准:
  • 警报警报
  • 设置下列警报:
  • 失败的工作流程 (现状 : 失败)

工作流动时间太长

AIP 投票失败

尚未在预期的时间内发射的触发器

绩效考量

// Instead of querying per trigger
var triggers = await _context.WorkflowTriggerStates
    .Include(t => t.WorkflowDefinition)
    .Where(t => t.IsEnabled && t.TriggerType == "ApiPoll")
    .AsNoTracking() // Read-only
    .ToListAsync();

数据库负载

由于许多工作流程的民意测验频繁进行,数据库载重可能很大:

解决方案: 批量查询

catch (HttpRequestException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
    // Back off
    var retryAfter = response.Headers.RetryAfter?.Delta ?? TimeSpan.FromMinutes(5);
    state["backoffUntil"] = DateTime.UtcNow.Add(retryAfter).ToString("O");
}

AIP 利率限制

在对外投票时:

解决方案: 指数回扣

public class ConditionalTriggerConfig : ApiPollTriggerConfig
{
    public string? Condition { get; set; } // e.g., "{{stars}} > 1000"
}

高级特征

有条件触发器

// After workflow completes
if (execution.Status == WorkflowExecutionStatus.Completed)
{
    var dependentTriggers = await _context.WorkflowTriggerStates
        .Where(t => t.TriggerType == "WorkflowComplete" &&
                    t.ConfigJson.Contains(execution.WorkflowId))
        .ToListAsync();

    foreach (var trigger in dependentTriggers)
    {
        await _executionService.ExecuteWorkflowAsync(
            trigger.WorkflowDefinition.WorkflowId,
            execution.OutputData,
            $"Triggered by {execution.WorkflowId}");
    }
}

只有满足某些条件时,才会触发:

触发

[Fact]
public async Task ExecuteScheduledWorkflows_ShouldExecuteWhenIntervalPassed()
{
    // Arrange
    var mockContext = CreateMockContext();
    var mockExecutionService = new Mock<IWorkflowExecutionService>();
    var job = new WorkflowSchedulerJob(mockContext.Object,
        mockExecutionService.Object, Mock.Of<ILogger>());

    // Act
    await job.ExecuteScheduledWorkflowsAsync();

    // Assert
    mockExecutionService.Verify(s => s.ExecuteWorkflowAsync(
        It.IsAny<string>(),
        It.IsAny<Dictionary<string, object>>(),
        "Scheduler",
        It.IsAny<CancellationToken>()), Times.Once);
}

链触发器-一个工作流程的完成触发另一个:

夜间工作测试

✅ **单位测试您的工作 :**结论 结论 结论 结论 结论 ✅ **我们建造了一个完整的自动化系统!**我们的工作流程现在可以: ✅ 按时间表运行- 每小时、每日或按规定间隔时间 ✅ 票价价格指数- 监测外部服务的变化 ✅ 音轨状态- 记住我们以前所见过的 ✅ 自自动重新导入- 处理瞬时故障

监视监视器

  • 所有工作都用纸板

  • 比例比 比额表 比额表- 点火处理负负负平衡

  • 完整系列我们从零开始建立了一个企业级工作流程系统:

  • 第一部分 第一部分:导言和结构

  • 第二部分 第二部分核心工作流程引擎

第三部分 第三部分

  • :视觉工作流程编辑器
  • 第四部分 第四部分
  • :营火整合(本员额)
  • 你现在有:
  • 强大的工作流程引擎

美丽的视觉编辑

自动执行

  • APPI 监测完全可观测
  • **下一个是什么?**可能的改进:
  • Webhoooks 网络图:通过 HTTP 端点触发工作流程
  • 电子邮件节点: 从工作流程发送邮件
  • 数据库节点:查询数据库
  • AI 节点:与LLMM公司结合

次级工作流量

: 组合工作流程

  • 工作流程市场Mostlylucid.SchedulerService/Jobs/
  • :共享工作流程模板Mostlylucid.Workflow.Shared/
  • 源代码Mostlylucid.Workflow.Engine/

Thank you for following this series! Happy workflow building! 🎉

Finding related posts...
logo

© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.