This is a viewer only at the moment see the article on how this works.
To update the preview hit Ctrl-Alt-R (or ⌘-Alt-R on Mac) or Enter to refresh. The Save icon lets you save the markdown file to disk
This is a preview from the server running through my markdig pipeline
Wednesday, 15 January 2025
ВхідЧастина 1Ми впровадили ідею створення нетипової робочої системи.
Тепер настав час забруднювати наші руки!
Декілька вбудованих типів вузлів
Mostlylucid.Workflow.Shared/ # Shared models and DTOs
├── Models/
│ ├── WorkflowNode.cs # Node definition
│ ├── NodeConnection.cs # Connections between nodes
│ ├── WorkflowDefinition.cs # Complete workflow definition
│ └── WorkflowExecution.cs # Execution tracking
Mostlylucid.Workflow.Engine/ # Core execution engine
├── Interfaces/
│ ├── IWorkflowNode.cs # Node interface
│ ├── IWorkflowExecutor.cs # Executor interface
│ └── INodeRegistry.cs # Node registry interface
├── Execution/
│ ├── NodeRegistry.cs # Registry for node types
│ └── WorkflowExecutor.cs # Main execution engine
└── Nodes/
├── BaseWorkflowNode.cs # Base node implementation
├── HttpRequestNode.cs # HTTP API calls
├── TransformNode.cs # Data transformation
└── DelayNode.cs # Delay execution
Mostlylucid.Shared/Entities/ # Database entities (EF Core)
├── WorkflowDefinitionEntity.cs
├── WorkflowExecutionEntity.cs
└── WorkflowTriggerStateEntity.cs
Основні моделіWorkflowNodeWorkflowNode
public class WorkflowNode
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Type { get; set; } = string.Empty;
public string Name { get; set; } = string.Empty;
public string? Description { get; set; }
// Configuration
public Dictionary<string, object> Inputs { get; set; } = new();
public Dictionary<string, string> Outputs { get; set; } = new();
public Dictionary<string, string> Conditions { get; set; } = new();
// Visual properties
public NodePosition Position { get; set; } = new();
public NodeStyle Style { get; set; } = new();
}
public class NodeStyle
{
public string BackgroundColor { get; set; } = "#3B82F6";
public string TextColor { get; set; } = "#FFFFFF";
public string BorderColor { get; set; } = "#2563EB";
public string? Icon { get; set; }
public int Width { get; set; } = 200;
public int Height { get; set; } = 100;
}
The
{{variable}}: Вузли знають, як відтворювати себе: Значення можуть використовувати
public class NodeConnection
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string SourceNodeId { get; set; } = string.Empty;
public string TargetNodeId { get; set; } = string.Empty;
public string SourceOutput { get; set; } = "default";
public string TargetInput { get; set; } = "default";
public string? Condition { get; set; }
public string? Label { get; set; }
}
синтаксис для динамічних даних
Змінені вхідні/ вихідні дані для ясності
public class WorkflowDefinition
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Name { get; set; } = string.Empty;
public string? Description { get; set; }
public int Version { get; set; } = 1;
public List<WorkflowNode> Nodes { get; set; } = new();
public List<NodeConnection> Connections { get; set; } = new();
public string? StartNodeId { get; set; }
public List<string> Tags { get; set; } = new();
public bool IsEnabled { get; set; } = true;
public Dictionary<string, object>? Variables { get; set; }
}
Робота з висновкамиCommentWorkflowExecutorПовний робочий процес:
public async Task<WorkflowExecution> ExecuteAsync(
WorkflowDefinition workflow,
Dictionary<string, object>? inputData = null,
string? triggeredBy = null,
CancellationToken cancellationToken = default)
{
var execution = new WorkflowExecution
{
Id = Guid.NewGuid().ToString(),
WorkflowId = workflow.Id,
Status = WorkflowExecutionStatus.Running,
StartedAt = DateTime.UtcNow,
InputData = inputData,
Context = new Dictionary<string, object>(inputData ?? new())
};
try
{
// Validate workflow
var validationErrors = ValidateWorkflow(workflow);
if (validationErrors.Any())
{
throw new InvalidOperationException(
$"Workflow validation failed: {string.Join(", ", validationErrors)}");
}
// Create execution context
var context = new WorkflowExecutionContext
{
Execution = execution,
Workflow = workflow,
Data = execution.Context,
Services = _serviceProvider
};
// Find and execute start node
var startNode = workflow.Nodes.FirstOrDefault(n => n.Id == workflow.StartNodeId);
if (startNode == null)
{
throw new InvalidOperationException("No start node found");
}
await ExecuteNodeRecursiveAsync(startNode, context, cancellationToken);
execution.Status = WorkflowExecutionStatus.Completed;
execution.CompletedAt = DateTime.UtcNow;
execution.OutputData = context.Data;
}
catch (Exception ex)
{
execution.Status = WorkflowExecutionStatus.Failed;
execution.ErrorMessage = ex.Message;
// ... error handling
}
return execution;
}
це мозок нашої системи.
private async Task ExecuteNodeRecursiveAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken)
{
// Get node implementation from registry
var node = _nodeRegistry.GetNode(nodeConfig.Type);
if (node == null)
{
throw new InvalidOperationException($"Node type '{nodeConfig.Type}' not registered");
}
// Execute the node
var result = await node.ExecuteAsync(nodeConfig, context, cancellationToken);
// Record execution history
context.Execution.NodeExecutions.Add(result);
// Store outputs for downstream nodes
if (result.OutputData != null)
{
context.NodeOutputs[nodeConfig.Id] = result.OutputData;
// Merge into shared context
foreach (var (key, value) in result.OutputData)
{
context.Data[key] = value;
}
}
// Handle failure with error routing
if (result.Status == NodeExecutionStatus.Failed)
{
var errorConnection = context.Workflow.Connections
.FirstOrDefault(c => c.SourceNodeId == nodeConfig.Id &&
c.SourceOutput == "error");
if (errorConnection != null)
{
// Route to error handler
var errorNode = context.Workflow.Nodes
.FirstOrDefault(n => n.Id == errorConnection.TargetNodeId);
if (errorNode != null)
{
await ExecuteNodeRecursiveAsync(errorNode, context, cancellationToken);
return;
}
}
throw new Exception($"Node {nodeConfig.Id} failed: {result.ErrorMessage}");
}
// Find and execute downstream nodes
var outgoingConnections = context.Workflow.Connections
.Where(c => c.SourceNodeId == nodeConfig.Id && c.SourceOutput != "error")
.ToList();
foreach (var connection in outgoingConnections)
{
// Check connection condition
if (!string.IsNullOrEmpty(connection.Condition))
{
if (!EvaluateCondition(connection.Condition, context))
{
continue; // Skip this connection
}
}
// Execute target node
var targetNode = context.Workflow.Nodes
.FirstOrDefault(n => n.Id == connection.TargetNodeId);
if (targetNode != null)
{
await ExecuteNodeRecursiveAsync(targetNode, context, cancellationToken);
}
}
}
Він інтерпретує визначення процесу обробки і виконує їх.
Проста для розуміння і реалізаціїNodeRegistryПриродний потік виконання
public class NodeRegistry : INodeRegistry
{
private readonly Dictionary<string, Type> _nodeTypes = new();
private readonly IServiceProvider _serviceProvider;
public void RegisterNode<TNode>(string nodeType) where TNode : IWorkflowNode
{
_nodeTypes[nodeType] = typeof(TNode);
}
public IWorkflowNode? GetNode(string nodeType)
{
if (!_nodeTypes.TryGetValue(nodeType, out var type))
{
return null;
}
// Try DI first, fallback to Activator
return _serviceProvider.GetService(type) as IWorkflowNode
?? Activator.CreateInstance(type) as IWorkflowNode;
}
}
Легко додати паралельну виконання пізніше
Легке додавання нетипових вузлів
public class HttpRequestNode : BaseWorkflowNode
{
private readonly IHttpClientFactory _httpClientFactory;
public override string NodeType => "HttpRequest";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var url = resolvedInputs.GetValueOrDefault("url")?.ToString();
var method = resolvedInputs.GetValueOrDefault("method")?.ToString() ?? "GET";
var headers = resolvedInputs.GetValueOrDefault("headers") as Dictionary<string, object>;
var body = resolvedInputs.GetValueOrDefault("body");
var client = _httpClientFactory.CreateClient();
// Add headers
if (headers != null)
{
foreach (var (key, value) in headers)
{
client.DefaultRequestHeaders.TryAddWithoutValidation(
key, value?.ToString() ?? string.Empty);
}
}
// Make request
HttpResponseMessage response = method.ToUpper() switch
{
"GET" => await client.GetAsync(url, cancellationToken),
"POST" => await client.PostAsJsonAsync(url, body, cancellationToken),
"PUT" => await client.PutAsJsonAsync(url, body, cancellationToken),
"DELETE" => await client.DeleteAsync(url, cancellationToken),
_ => throw new InvalidOperationException($"Unsupported method: {method}")
};
var responseBody = await response.Content.ReadAsStringAsync(cancellationToken);
var outputData = new Dictionary<string, object>
{
["statusCode"] = (int)response.StatusCode,
["body"] = JsonSerializer.Deserialize<Dictionary<string, object>>(responseBody)
?? responseBody,
["isSuccess"] = response.IsSuccessStatusCode
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
Підтримка ін'єкції залежностей
{
"type": "HttpRequest",
"inputs": {
"url": "https://api.github.com/repos/{{owner}}/{{repo}}",
"method": "GET",
"headers": {
"Authorization": "Bearer {{apiToken}}",
"Accept": "application/vnd.github+json"
}
},
"outputs": {
"repoData": "{{body}}",
"statusCode": "{{statusCode}}"
}
}
Вбудовані вузли
public class TransformNode : BaseWorkflowNode
{
public override string NodeType => "Transform";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var operation = resolvedInputs.GetValueOrDefault("operation")?.ToString();
var inputData = resolvedInputs.GetValueOrDefault("data");
object result = operation?.ToLower() switch
{
"uppercase" => inputData?.ToString()?.ToUpper() ?? string.Empty,
"lowercase" => inputData?.ToString()?.ToLower() ?? string.Empty,
"trim" => inputData?.ToString()?.Trim() ?? string.Empty,
"length" => inputData?.ToString()?.Length ?? 0,
"json_parse" => JsonSerializer.Deserialize<Dictionary<string, object>>(
inputData?.ToString() ?? "{}"),
"json_stringify" => JsonSerializer.Serialize(inputData),
_ => inputData ?? string.Empty
};
var outputData = new Dictionary<string, object>
{
["result"] = result
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
Робить виклики HTTP API з повним налаштуванням:
public class DelayNode : BaseWorkflowNode
{
public override string NodeType => "Delay";
public override async Task<NodeExecutionResult> ExecuteAsync(
WorkflowNode nodeConfig,
WorkflowExecutionContext context,
CancellationToken cancellationToken = default)
{
var resolvedInputs = ResolveTemplates(nodeConfig.Inputs, context);
var durationMs = int.Parse(
resolvedInputs.GetValueOrDefault("durationMs")?.ToString() ?? "0");
await Task.Delay(durationMs, cancellationToken);
var outputData = new Dictionary<string, object>
{
["delayedMs"] = durationMs,
["completedAt"] = DateTime.UtcNow.ToString("O")
};
return CreateSuccessResult(nodeConfig, outputData, resolvedInputs);
}
}
TransformNode{{variable}}Проста зміна даних:BaseWorkflowNodeЗатримкаNode
protected string ResolveTemplate(string template, WorkflowExecutionContext context)
{
if (string.IsNullOrEmpty(template)) return template;
var result = template;
var matches = Regex.Matches(template, @"\{\{([^}]+)\}\}");
foreach (Match match in matches)
{
var variable = match.Groups[1].Value.Trim();
if (context.Data.TryGetValue(variable, out var value))
{
result = result.Replace(match.Value, value?.ToString() ?? string.Empty);
}
}
return result;
}
Додає затримку до робочих потоків:
{
"type": "HttpRequest",
"inputs": {
"url": "{{apiBaseUrl}}/users/{{userId}}/posts",
"headers": {
"Authorization": "Bearer {{authToken}}"
}
}
}
Вузли підтримують змінні шаблонів за допомогою
[Table("workflow_definitions")]
public class WorkflowDefinitionEntity
{
[Key]
public int Id { get; set; }
[Required]
[MaxLength(100)]
public string WorkflowId { get; set; } = string.Empty;
[Required]
[MaxLength(200)]
public string Name { get; set; } = string.Empty;
[Column(TypeName = "jsonb")]
public string DefinitionJson { get; set; } = string.Empty;
public bool IsEnabled { get; set; } = true;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
public ICollection<WorkflowExecutionEntity> Executions { get; set; } = new List<WorkflowExecutionEntity>();
}
синтаксис.
Для наполегливості ми використовуємо Core Framework Core з PostgreSQL:
{
"id": "github-repo-workflow",
"name": "GitHub Repository Info Fetcher",
"startNodeId": "fetch-repo",
"nodes": [
{
"id": "fetch-repo",
"type": "HttpRequest",
"name": "Fetch Repository",
"inputs": {
"url": "https://api.github.com/repos/{{owner}}/{{repo}}",
"method": "GET",
"headers": {
"Accept": "application/vnd.github+json"
}
},
"outputs": {
"repoData": "{{body}}"
},
"position": { "x": 100, "y": 100 },
"style": { "backgroundColor": "#10B981", "icon": "🔍" }
},
{
"id": "extract-name",
"type": "Transform",
"name": "Extract Repo Name",
"inputs": {
"operation": "json_stringify",
"data": "{{repoData}}"
},
"position": { "x": 100, "y": 250 },
"style": { "backgroundColor": "#3B82F6", "icon": "🔄" }
}
],
"connections": [
{
"id": "conn-1",
"sourceNodeId": "fetch-repo",
"targetNodeId": "extract-name",
"sourceOutput": "default",
"label": "On Success"
}
],
"variables": {
"owner": "scottgal",
"repo": "mostlylucidweb"
}
}
Гнучкість: визначення робочого потоку можуть розвиватися без міграціїШвидка: JSONB PostgreSQL індексовано і придатно до запитівПросте: немає потреби у складній реляційній карті
Приклад процесу
Ми побудуємо візуальний редактор за допомогою HTMX, альпійських.js, TailwindCSS та DaisUI.
Прекрасний інтерфейс з темамиName
Висновки
Гнучка архітектура вузлів
Mostlylucid.Workflow.Shared/Models/Mostlylucid.Workflow.Engine/Mostlylucid.Shared/Entities/Наполегливість бази даних
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.