内第三部分 第三部分我们建造了一个美丽的视觉编辑
监测所有背景工作
为什么要着火?
绞刑是适合我们需要的 因为它:
我们现有PostgreSQL数据库的仓库工作
提供内置仪式仪表板
支持经常性工作
水平水平缩平
[Table("workflow_trigger_states")]
public class WorkflowTriggerStateEntity
{
public int Id { get; set; }
public int WorkflowDefinitionId { get; set; }
// Type: "Schedule", "ApiPoll", "Webhook"
public string TriggerType { get; set; } = string.Empty;
// Configuration as JSON
public string ConfigJson { get; set; } = "{}";
// Current state as JSON (stores last poll time, content hash, etc.)
public string StateJson { get; set; } = "{}";
public bool IsEnabled { get; set; } = true;
public DateTime? LastCheckedAt { get; set; }
public DateTime? LastFiredAt { get; set; }
public int FireCount { get; set; } = 0;
public string? LastError { get; set; }
}
触发国模式
public class ScheduleTriggerConfig
{
public string IntervalType { get; set; } = "minutes"; // minutes, hours, days
public int IntervalValue { get; set; } = 60;
public Dictionary<string, object>? InputData { get; set; }
}
public class WorkflowSchedulerJob
{
private readonly MostlylucidDbContext _context;
private readonly WorkflowExecutionService _executionService;
private readonly ILogger<WorkflowSchedulerJob> _logger;
[AutomaticRetry(Attempts = 3)]
public async Task ExecuteScheduledWorkflowsAsync()
{
_logger.LogInformation("Checking for scheduled workflows");
// Get all enabled schedule triggers
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "Schedule")
.ToListAsync();
foreach (var trigger in triggers)
{
try
{
var config = JsonSerializer.Deserialize<ScheduleTriggerConfig>(
trigger.ConfigJson);
if (config == null) continue;
// Check if it's time to run
if (!ShouldRunScheduledWorkflow(trigger, config))
continue;
_logger.LogInformation(
"Executing scheduled workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Execute the workflow
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
config.InputData,
"Scheduler");
// Update trigger state
trigger.LastCheckedAt = DateTime.UtcNow;
trigger.LastFiredAt = DateTime.UtcNow;
trigger.FireCount++;
var state = JsonSerializer.Deserialize<Dictionary<string, object>>(
trigger.StateJson) ?? new();
state["lastRun"] = DateTime.UtcNow.ToString("O");
trigger.StateJson = JsonSerializer.Serialize(state);
await _context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error executing scheduled workflow {TriggerId}",
trigger.Id);
trigger.LastError = ex.Message;
await _context.SaveChangesAsync();
}
}
}
private bool ShouldRunScheduledWorkflow(
WorkflowTriggerStateEntity trigger,
ScheduleTriggerConfig config)
{
// First run?
if (!trigger.LastFiredAt.HasValue)
return true;
var timeSinceLastRun = DateTime.UtcNow - trigger.LastFiredAt.Value;
return config.IntervalType.ToLower() switch
{
"minutes" => timeSinceLastRun.TotalMinutes >= config.IntervalValue,
"hours" => timeSinceLastRun.TotalHours >= config.IntervalValue,
"days" => timeSinceLastRun.TotalDays >= config.IntervalValue,
_ => false
};
}
}
配置模型
ExecuteScheduledWorkflowsAsync()以上次运行时间更新触发状态
public class ApiPollTriggerConfig
{
public string Url { get; set; } = string.Empty;
public int IntervalSeconds { get; set; } = 300; // 5 minutes
public bool AlwaysTrigger { get; set; } = false;
public Dictionary<string, string>? Headers { get; set; }
}
[AutomaticRetry(Attempts = 3)]
public async Task PollApiTriggersAsync()
{
_logger.LogInformation("Polling API triggers");
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "ApiPoll")
.ToListAsync();
foreach (var trigger in triggers)
{
try
{
var config = JsonSerializer.Deserialize<ApiPollTriggerConfig>(
trigger.ConfigJson);
if (config == null) continue;
// Check if it's time to poll
if (trigger.LastCheckedAt.HasValue)
{
var timeSinceLastCheck = DateTime.UtcNow - trigger.LastCheckedAt.Value;
if (timeSinceLastCheck.TotalSeconds < config.IntervalSeconds)
continue;
}
_logger.LogInformation("Polling API for workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Poll the API
using var httpClient = new HttpClient();
var response = await httpClient.GetAsync(config.Url);
var content = await response.Content.ReadAsStringAsync();
// Get previous state
var state = JsonSerializer.Deserialize<Dictionary<string, object>>(
trigger.StateJson) ?? new();
var previousHash = state.GetValueOrDefault("contentHash")?.ToString();
var currentHash = ComputeHash(content);
// Has content changed?
if (previousHash != currentHash || config.AlwaysTrigger)
{
_logger.LogInformation(
"API content changed, triggering workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Pass response as input to workflow
var inputData = new Dictionary<string, object>
{
["apiResponse"] = content,
["statusCode"] = (int)response.StatusCode,
["previousHash"] = previousHash ?? string.Empty,
["currentHash"] = currentHash
};
// Execute the workflow
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
inputData,
$"ApiPoll:{config.Url}");
trigger.LastFiredAt = DateTime.UtcNow;
trigger.FireCount++;
// Update state
state["contentHash"] = currentHash;
state["lastContent"] = content.Length > 1000
? content.Substring(0, 1000)
: content;
state["lastPoll"] = DateTime.UtcNow.ToString("O");
}
trigger.LastCheckedAt = DateTime.UtcNow;
trigger.StateJson = JsonSerializer.Serialize(state);
trigger.LastError = null;
await _context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error polling API trigger {TriggerId}",
trigger.Id);
trigger.LastError = ex.Message;
trigger.LastCheckedAt = DateTime.UtcNow;
await _context.SaveChangesAsync();
}
}
}
private string ComputeHash(string content)
{
using var sha256 = System.Security.Cryptography.SHA256.Create();
var bytes = System.Text.Encoding.UTF8.GetBytes(content);
var hash = sha256.ComputeHash(bytes);
return Convert.ToBase64String(hash);
}
配置模型
AlwaysTrigger与以前储存在州内的散列相比以新的散列更新状态
{
"triggerType": "ApiPoll",
"config": {
"url": "https://api.github.com/repos/dotnet/aspnetcore/releases/latest",
"intervalSeconds": 3600,
"alwaysTrigger": false
}
}
使用案例示例
这是每小时一次的GitHub API民调。Program.cs当发布新版本时, 内容会更改散列, 而工作流程会随发布数据执行 !
// Add Hangfire services
builder.Services.AddHangfire(config =>
{
config.UsePostgreSqlStorage(
builder.Configuration.GetConnectionString("DefaultConnection"));
});
builder.Services.AddHangfireServer();
// Register our job
builder.Services.AddScoped<WorkflowSchedulerJob>();
挂牌点火工作
app.UseHangfireDashboard("/hangfire");
// Register recurring jobs
RecurringJob.AddOrUpdate<WorkflowSchedulerJob>(
"scheduled-workflows",
job => job.ExecuteScheduledWorkflowsAsync(),
Cron.Minutely);
RecurringJob.AddOrUpdate<WorkflowSchedulerJob>(
"api-poll-triggers",
job => job.PollApiTriggersAsync(),
Cron.Minutely);
启动配置 :/hangfire:
app.UseHangfireDashboard("/hangfire", new DashboardOptions
{
Authorization = new[]
{
new HangfireAuthorizationFilter()
}
});
public class HangfireAuthorizationFilter : IDashboardAuthorizationFilter
{
public bool Authorize(DashboardContext context)
{
var httpContext = context.GetHttpContext();
// Only allow authenticated users
return httpContext.User.Identity?.IsAuthenticated == true;
}
}
监视燃火服务器
[HttpPost("workflow/{id}/triggers")]
public async Task<IActionResult> CreateTrigger(
string id,
[FromBody] TriggerCreateRequest request)
{
var workflow = await _context.WorkflowDefinitions
.FirstOrDefaultAsync(w => w.WorkflowId == id);
if (workflow == null)
return NotFound();
var trigger = new WorkflowTriggerStateEntity
{
WorkflowDefinitionId = workflow.Id,
TriggerType = request.Type,
ConfigJson = JsonSerializer.Serialize(request.Config),
StateJson = "{}",
IsEnabled = true
};
await _context.WorkflowTriggerStates.AddAsync(trigger);
await _context.SaveChangesAsync();
return Json(new { success = true, triggerId = trigger.Id });
}
public class TriggerCreateRequest
{
public string Type { get; set; } = string.Empty; // Schedule, ApiPoll
public object Config { get; set; } = new();
}
<div class="card bg-base-100 shadow-xl">
<div class="card-body">
<h2 class="card-title">⏰ Add Trigger</h2>
<div class="form-control">
<label class="label">Trigger Type</label>
<select class="select select-bordered" x-model="triggerType">
<option value="Schedule">Schedule</option>
<option value="ApiPoll">API Poll</option>
</select>
</div>
<!-- Schedule Config -->
<template x-if="triggerType === 'Schedule'">
<div class="space-y-4">
<div class="form-control">
<label class="label">Interval</label>
<div class="flex gap-2">
<input type="number"
x-model="scheduleConfig.intervalValue"
class="input input-bordered flex-1" />
<select x-model="scheduleConfig.intervalType"
class="select select-bordered">
<option value="minutes">Minutes</option>
<option value="hours">Hours</option>
<option value="days">Days</option>
</select>
</div>
</div>
</div>
</template>
<!-- API Poll Config -->
<template x-if="triggerType === 'ApiPoll'">
<div class="space-y-4">
<div class="form-control">
<label class="label">API URL</label>
<input type="url"
x-model="apiConfig.url"
class="input input-bordered"
placeholder="https://api.example.com/data" />
</div>
<div class="form-control">
<label class="label">Poll Interval (seconds)</label>
<input type="number"
x-model="apiConfig.intervalSeconds"
class="input input-bordered"
value="300" />
</div>
</div>
</template>
<button @click="createTrigger()" class="btn btn-primary mt-4">
Create Trigger
</button>
</div>
</div>
让我们添加用于创建和管理触发器的UI:
{
"name": "GitHub Release Monitor",
"startNodeId": "parse-data",
"nodes": [
{
"id": "parse-data",
"type": "Transform",
"name": "Extract Version",
"inputs": {
"operation": "json_parse",
"data": "{{apiResponse}}"
}
},
{
"id": "log-release",
"type": "Log",
"name": "Log New Release",
"inputs": {
"message": "New release: {{tag_name}} - {{name}}",
"level": "info"
}
}
],
"connections": [
{
"sourceNodeId": "parse-data",
"targetNodeId": "log-release"
}
]
}
{
"type": "ApiPoll",
"config": {
"url": "https://api.github.com/repos/dotnet/aspnetcore/releases/latest",
"intervalSeconds": 3600
}
}
(可以发送电子邮件、邮寄至Slack等)
工作流程解析 JSON 并记录发布信息
_logger.LogInformation(
"Workflow {WorkflowId} execution {ExecutionId} completed in {Duration}ms with status {Status}",
execution.WorkflowId,
execution.Id,
execution.DurationMs,
execution.Status);
伐木
private static readonly Counter WorkflowExecutions = Metrics
.CreateCounter("workflow_executions_total",
"Total workflow executions",
new CounterConfiguration
{
LabelNames = new[] { "workflow_id", "status" }
});
// In execution service
WorkflowExecutions
.WithLabels(workflow.Id, execution.Status.ToString())
.Inc();
计量数
尚未在预期的时间内发射的触发器
绩效考量
// Instead of querying per trigger
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "ApiPoll")
.AsNoTracking() // Read-only
.ToListAsync();
由于许多工作流程的民意测验频繁进行,数据库载重可能很大:
解决方案: 批量查询
catch (HttpRequestException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
// Back off
var retryAfter = response.Headers.RetryAfter?.Delta ?? TimeSpan.FromMinutes(5);
state["backoffUntil"] = DateTime.UtcNow.Add(retryAfter).ToString("O");
}
解决方案: 指数回扣
public class ConditionalTriggerConfig : ApiPollTriggerConfig
{
public string? Condition { get; set; } // e.g., "{{stars}} > 1000"
}
有条件触发器
// After workflow completes
if (execution.Status == WorkflowExecutionStatus.Completed)
{
var dependentTriggers = await _context.WorkflowTriggerStates
.Where(t => t.TriggerType == "WorkflowComplete" &&
t.ConfigJson.Contains(execution.WorkflowId))
.ToListAsync();
foreach (var trigger in dependentTriggers)
{
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
execution.OutputData,
$"Triggered by {execution.WorkflowId}");
}
}
触发
[Fact]
public async Task ExecuteScheduledWorkflows_ShouldExecuteWhenIntervalPassed()
{
// Arrange
var mockContext = CreateMockContext();
var mockExecutionService = new Mock<IWorkflowExecutionService>();
var job = new WorkflowSchedulerJob(mockContext.Object,
mockExecutionService.Object, Mock.Of<ILogger>());
// Act
await job.ExecuteScheduledWorkflowsAsync();
// Assert
mockExecutionService.Verify(s => s.ExecuteWorkflowAsync(
It.IsAny<string>(),
It.IsAny<Dictionary<string, object>>(),
"Scheduler",
It.IsAny<CancellationToken>()), Times.Once);
}
夜间工作测试
✅ **单位测试您的工作 :**结论 结论 结论 结论 结论 ✅ **我们建造了一个完整的自动化系统!**我们的工作流程现在可以: ✅ 按时间表运行- 每小时、每日或按规定间隔时间 ✅ 票价价格指数- 监测外部服务的变化 ✅ 音轨状态- 记住我们以前所见过的 ✅ 自自动重新导入- 处理瞬时故障
所有工作都用纸板
比例比 比额表 比额表- 点火处理负负负平衡
完整系列我们从零开始建立了一个企业级工作流程系统:
第一部分 第一部分:导言和结构
第二部分 第二部分核心工作流程引擎
第三部分 第三部分
自动执行
: 组合工作流程
Mostlylucid.SchedulerService/Jobs/Mostlylucid.Workflow.Shared/Mostlylucid.Workflow.Engine/Thank you for following this series! Happy workflow building! 🎉
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.