This is a viewer only at the moment see the article on how this works.
To update the preview hit Ctrl-Alt-R (or ⌘-Alt-R on Mac) or Enter to refresh. The Save icon lets you save the markdown file to disk
This is a preview from the server running through my markdig pipeline
Monday, 13 January 2025
注 注 注 注 注注:这是我忘记发行的旧文章。
在这里,享受吧!
我更新了它,但可能是我错过了一些问题。
使用现代的.NET工具来正确执行这些工具: Marten 用于事件来源, Dapper 用于最佳查询,MediatR 用于组织一切。
**并解释为何试图将事件与手动缓冲失效混为一谈,**一. 导言 导言 导言 导言 导言 导言 一,导言 导言 导言 导言 导言 导言
**CQRS(Command Query Responsibility Socureation)和事件追赶是两种特别行之有效的不同模式:**CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CRS CQRS CQRS CQRS CQRS CQRS CRS CQRS CQRS CQRS CQRS CQRS CQRS CC CQRS CQRS CC CQRS CC CQRS CC CQRS CC CQRS CC CQRS C CQRS C CC CQRS CC CQRS
将你的阅读模型与你的写模型分开
事件时间间隔
如果用像Marten这样的工具做对了, 你会得到:
每项变动的完整审计跟踪
在任何时候重建国家的能力自动读取模型预测自然适合域驱动设计
适当
// Write Model - Commands that change state
public record CreateBlogPostCommand(string Title, string Content, string AuthorId);
// Read Model - DTOs optimised for display
public class BlogPostListItemDto
{
public Guid Id { get; set; }
public string Title { get; set; }
public string AuthorName { get; set; }
public DateTime PublishedDate { get; set; }
public int CommentCount { get; set; }
}
什么是CQRS?
// Traditional: Store current state
public class BlogPost
{
public Guid Id { get; set; }
public string Title { get; set; } // Current title
public bool IsPublished { get; set; } // Current status
}
// Event Sourcing: Store the events
public record BlogPostCreated(Guid Id, string Title, string Content, DateTime CreatedAt);
public record BlogPostTitleChanged(Guid Id, string OldTitle, string NewTitle, DateTime ChangedAt);
public record BlogPostPublished(Guid Id, DateTime PublishedAt);
CQRS的核心是使用不同的读和写数据模式:
**读取面被拆分, 并优化显示 。**很简单,但真正的CQRS 意味着它们是完全分开的路径 通过你的应用程序。
**什么事件是苏奎宁?**您不保存当前状态, 而是保存导致该状态的事件 :
**当前状态通过重播事件产生。**这给了你一个完整的历史 记录你系统中发生的一切
**为什么使用 CQRS 事件缓冲 ?**完整的审计拖车
**:每个变化都有记录。**完全适合财务系统、医疗 或任何你需要证明 发生的事情和时间的地方
**”这篇博客文章与上周二有什么相似之处?**除调调
**:通过重播造成虫子的事件的确切顺序来复制虫子。**商业情报
**:根据历史数据编制新的报告,而不进行迁移。**这些事件已经存在。
CQRS 适合自然自然: 自然而然地将事件(附录事件)与读物(询问预测)分开写作(附录事件)
简单 CRUUD 简单 CRUUD:如果你只是存储和检索数据, 事件抛锚是荒谬的间接费用。缺乏经验的小型团队
学习曲线是陡峭的。
马尔坦
dotnet add package Marten
dotnet add package Marten.AspNetCore
Program.cs:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMarten(options =>
{
options.Connection(builder.Configuration.GetConnectionString("Marten")!);
// Register event types
options.Events.AddEventType<BlogPostCreated>();
options.Events.AddEventType<BlogPostPublished>();
options.Events.AddEventType<BlogPostTitleChanged>();
options.Events.AddEventType<CommentAdded>();
// Configure async projections
options.Projections.Add<BlogPostProjection>(ProjectionLifecycle.Async);
});
builder.Services.AddMediatR(cfg =>
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
var app = builder.Build();
为什么是Marten?
flowchart TB
subgraph Client["Client Application"]
UI[User Interface]
end
subgraph Commands["Command Side (Writes)"]
CMD[Commands] --> CMDH[Command Handlers]
CMDH --> MARTEN[Marten Session]
MARTEN --> EVENTS[(Event Store)]
end
subgraph Background["Async Processing"]
EVENTS -.->|Event Stream| DAEMON[Marten Async Daemon]
DAEMON --> PROJ[Projections]
PROJ --> READDB[(Read Models)]
end
subgraph Queries["Query Side (Reads)"]
QRY[Queries] --> QRYH[Query Handlers with Dapper]
QRYH --> READDB
end
UI -->|Commands| CMD
UI -->|Queries| QRY
classDef commandStyle fill:none,stroke:#e63946,stroke-width:3px
classDef queryStyle fill:none,stroke:#457b9d,stroke-width:3px
classDef dataStyle fill:none,stroke:#2a9d8f,stroke-width:3px
class CMD,CMDH,MARTEN commandStyle
class QRY,QRYH queryStyle
class EVENTS,READDB dataStyle
建于 PostgreSQL (你已经知道)
这是所有事情的搭配方式:
// Always past tense - these things have happened
public record BlogPostCreated(
Guid BlogPostId,
string Title,
string Content,
string AuthorId,
DateTime CreatedAt
);
public record BlogPostPublished(
Guid BlogPostId,
DateTime PublishedAt
);
public record BlogPostTitleChanged(
Guid BlogPostId,
string OldTitle,
string NewTitle,
DateTime ChangedAt
);
public record CommentAdded(
Guid BlogPostId,
Guid CommentId,
string Author,
string Content,
DateTime CreatedAt
);
要点:
界定事件
public class BlogPost
{
// Marten requires an Id property
public Guid Id { get; set; }
// Current state (private setters)
public string Title { get; private set; } = string.Empty;
public string Content { get; private set; } = string.Empty;
public string AuthorId { get; private set; } = string.Empty;
public bool IsPublished { get; private set; }
public DateTime? PublishedDate { get; private set; }
private readonly List<Comment> _comments = new();
public IReadOnlyList<Comment> Comments => _comments.AsReadOnly();
// Apply methods - called by Marten when replaying events
public void Apply(BlogPostCreated e)
{
Id = e.BlogPostId;
Title = e.Title;
Content = e.Content;
AuthorId = e.AuthorId;
}
public void Apply(BlogPostPublished e)
{
IsPublished = true;
PublishedDate = e.PublishedAt;
}
public void Apply(BlogPostTitleChanged e)
{
Title = e.NewTitle;
}
public void Apply(CommentAdded e)
{
_comments.Add(new Comment
{
Id = e.CommentId,
Author = e.Author,
Content = e.Content,
CreatedAt = e.CreatedAt
});
}
// Business logic methods that produce events
public static BlogPostCreated Create(string title, string content, string authorId)
{
if (string.IsNullOrWhiteSpace(title))
throw new ArgumentException("Title is required");
return new BlogPostCreated(
Guid.NewGuid(),
title,
content,
authorId,
DateTime.UtcNow
);
}
public BlogPostPublished Publish()
{
if (IsPublished)
throw new InvalidOperationException("Post is already published");
return new BlogPostPublished(Id, DateTime.UtcNow);
}
public BlogPostTitleChanged ChangeTitle(string newTitle)
{
if (string.IsNullOrWhiteSpace(newTitle))
throw new ArgumentException("Title cannot be empty");
if (newTitle == Title)
throw new InvalidOperationException("New title is the same as current title");
return new BlogPostTitleChanged(Id, Title, newTitle, DateTime.UtcNow);
}
}
public class Comment
{
public Guid Id { get; set; }
public string Author { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
}
事件是无法改变的记录,描述所发生的事情:
// Define commands
public record CreateBlogPostCommand(
string Title,
string Content,
string AuthorId
) : IRequest<Guid>;
public record PublishBlogPostCommand(Guid BlogPostId) : IRequest;
public record ChangeBlogPostTitleCommand(
Guid BlogPostId,
string NewTitle
) : IRequest;
// Handler for creating a blog post
public class CreateBlogPostHandler : IRequestHandler<CreateBlogPostCommand, Guid>
{
private readonly IDocumentSession _session;
public CreateBlogPostHandler(IDocumentSession session)
{
_session = session;
}
public async Task<Guid> Handle(CreateBlogPostCommand request, CancellationToken cancellationToken)
{
// Create the event
var created = BlogPost.Create(
request.Title,
request.Content,
request.AuthorId
);
// Start a new event stream
_session.Events.StartStream<BlogPost>(created.BlogPostId, created);
await _session.SaveChangesAsync(cancellationToken);
return created.BlogPostId;
}
}
// Handler for publishing
public class PublishBlogPostHandler : IRequestHandler<PublishBlogPostCommand>
{
private readonly IDocumentSession _session;
public PublishBlogPostHandler(IDocumentSession session)
{
_session = session;
}
public async Task Handle(PublishBlogPostCommand request, CancellationToken cancellationToken)
{
// Load the aggregate by replaying its events
var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(
request.BlogPostId,
token: cancellationToken
);
if (blogPost == null)
throw new InvalidOperationException($"Blog post {request.BlogPostId} not found");
// Business logic produces new event
var published = blogPost.Publish();
// Append event to the stream
_session.Events.Append(request.BlogPostId, published);
await _session.SaveChangesAsync(cancellationToken);
}
}
// Handler for changing title
public class ChangeBlogPostTitleHandler : IRequestHandler<ChangeBlogPostTitleCommand>
{
private readonly IDocumentSession _session;
public ChangeBlogPostTitleHandler(IDocumentSession session)
{
_session = session;
}
public async Task Handle(ChangeBlogPostTitleCommand request, CancellationToken cancellationToken)
{
var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(
request.BlogPostId,
token: cancellationToken
);
if (blogPost == null)
throw new InvalidOperationException($"Blog post {request.BlogPostId} not found");
var titleChanged = blogPost.ChangeTitle(request.NewTitle);
_session.Events.Append(request.BlogPostId, titleChanged);
await _session.SaveChangesAsync(cancellationToken);
}
}
商业意义丰富
模式 :
应用方法更新内部状态
// Read model - optimised for queries
public class BlogPostReadModel
{
public Guid Id { get; set; }
public string Title { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public string AuthorId { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
public DateTime? PublishedAt { get; set; }
public bool IsPublished { get; set; }
public int CommentCount { get; set; }
}
// Projection - tells Marten how to build read models from events
public class BlogPostProjection : MultiStreamProjection<BlogPostReadModel, Guid>
{
public BlogPostProjection()
{
// Identity tells Marten which stream each event belongs to
Identity<BlogPostCreated>(x => x.BlogPostId);
Identity<BlogPostPublished>(x => x.BlogPostId);
Identity<BlogPostTitleChanged>(x => x.BlogPostId);
Identity<CommentAdded>(x => x.BlogPostId);
}
// Apply methods - Marten calls these to update read models
public void Apply(BlogPostReadModel view, BlogPostCreated e)
{
view.Id = e.BlogPostId;
view.Title = e.Title;
view.Content = e.Content;
view.AuthorId = e.AuthorId;
view.CreatedAt = e.CreatedAt;
view.IsPublished = false;
}
public void Apply(BlogPostReadModel view, BlogPostPublished e)
{
view.IsPublished = true;
view.PublishedAt = e.PublishedAt;
}
public void Apply(BlogPostReadModel view, BlogPostTitleChanged e)
{
view.Title = e.NewTitle;
}
public void Apply(BlogPostReadModel view, CommentAdded e)
{
view.CommentCount++;
}
}
Marten 处理事件持续和重放
将事件附加到流中 :
// Define queries
public record GetRecentBlogPostsQuery(
int Count,
bool PublishedOnly
) : IRequest<List<BlogPostListItemDto>>;
public record GetBlogPostByIdQuery(Guid Id) : IRequest<BlogPostDetailDto?>;
// DTOs for display
public class BlogPostListItemDto
{
public Guid Id { get; set; }
public string Title { get; set; } = string.Empty;
public string AuthorName { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
public DateTime? PublishedAt { get; set; }
public int CommentCount { get; set; }
public bool IsPublished { get; set; }
}
public class BlogPostDetailDto
{
public Guid Id { get; set; }
public string Title { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public string AuthorId { get; set; } = string.Empty;
public string AuthorName { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
public DateTime? PublishedAt { get; set; }
public bool IsPublished { get; set; }
public List<CommentDto> Comments { get; set; } = new();
}
public class CommentDto
{
public Guid Id { get; set; }
public string Author { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
}
// Query handlers
public class GetRecentBlogPostsHandler : IRequestHandler<GetRecentBlogPostsQuery, List<BlogPostListItemDto>>
{
private readonly string _connectionString;
public GetRecentBlogPostsHandler(IConfiguration config)
{
_connectionString = config.GetConnectionString("Marten")!;
}
public async Task<List<BlogPostListItemDto>> Handle(
GetRecentBlogPostsQuery request,
CancellationToken cancellationToken)
{
await using var connection = new NpgsqlConnection(_connectionString);
// Query the Marten-generated read model table
const string sql = @"
SELECT
bp.id AS Id,
bp.title AS Title,
u.name AS AuthorName,
bp.created_at AS CreatedAt,
bp.published_at AS PublishedAt,
bp.comment_count AS CommentCount,
bp.is_published AS IsPublished
FROM blog_post_read_models bp
LEFT JOIN users u ON bp.author_id = u.id
WHERE (@PublishedOnly = false OR bp.is_published = true)
ORDER BY
CASE WHEN bp.is_published THEN bp.published_at
ELSE bp.created_at
END DESC
LIMIT @Count";
var results = await connection.QueryAsync<BlogPostListItemDto>(
sql,
new
{
PublishedOnly = request.PublishedOnly,
Count = request.Count
});
return results.ToList();
}
}
public class GetBlogPostByIdHandler : IRequestHandler<GetBlogPostByIdQuery, BlogPostDetailDto?>
{
private readonly string _connectionString;
public GetBlogPostByIdHandler(IConfiguration config)
{
_connectionString = config.GetConnectionString("Marten")!;
}
public async Task<BlogPostDetailDto?> Handle(
GetBlogPostByIdQuery request,
CancellationToken cancellationToken)
{
await using var connection = new NpgsqlConnection(_connectionString);
const string sql = @"
SELECT
bp.id AS Id,
bp.title AS Title,
bp.content AS Content,
bp.author_id AS AuthorId,
u.name AS AuthorName,
bp.created_at AS CreatedAt,
bp.published_at AS PublishedAt,
bp.is_published AS IsPublished
FROM blog_post_read_models bp
LEFT JOIN users u ON bp.author_id = u.id
WHERE bp.id = @Id";
var post = await connection.QuerySingleOrDefaultAsync<BlogPostDetailDto>(
sql,
new { request.Id });
if (post == null)
return null;
// Get comments from event stream if needed
// Or maintain a separate comment read model
return post;
}
}
流量 :
呼叫业务方法(验证和返回事件)
[ApiController]
[Route("api/[controller]")]
public class BlogPostsController : ControllerBase
{
private readonly IMediator _mediator;
public BlogPostsController(IMediator mediator)
{
_mediator = mediator;
}
[HttpGet]
public async Task<ActionResult<List<BlogPostListItemDto>>> GetRecent(
[FromQuery] int count = 10,
[FromQuery] bool publishedOnly = true)
{
var query = new GetRecentBlogPostsQuery(count, publishedOnly);
var results = await _mediator.Send(query);
return Ok(results);
}
[HttpGet("{id}")]
public async Task<ActionResult<BlogPostDetailDto>> GetById(Guid id)
{
var query = new GetBlogPostByIdQuery(id);
var result = await _mediator.Send(query);
if (result == null)
return NotFound();
return Ok(result);
}
[HttpPost]
public async Task<ActionResult<Guid>> Create([FromBody] CreateBlogPostCommand command)
{
var postId = await _mediator.Send(command);
return CreatedAtAction(nameof(GetById), new { id = postId }, postId);
}
[HttpPost("{id}/publish")]
public async Task<ActionResult> Publish(Guid id)
{
await _mediator.Send(new PublishBlogPostCommand(id));
return NoContent();
}
[HttpPut("{id}/title")]
public async Task<ActionResult> ChangeTitle(
Guid id,
[FromBody] ChangeBlogPostTitleCommand command)
{
if (id != command.BlogPostId)
return BadRequest();
await _mediator.Send(command);
return NoContent();
}
}
保存更改
sequenceDiagram
participant Client
participant Controller
participant MediatR
participant CommandHandler
participant Marten
participant EventStore
participant AsyncDaemon
participant ReadDB
participant QueryHandler
Note over Client,ReadDB: Write Operation
Client->>Controller: POST /api/blogposts
Controller->>MediatR: Send CreateBlogPostCommand
MediatR->>CommandHandler: Handle command
CommandHandler->>CommandHandler: Validate & create event
CommandHandler->>Marten: StartStream(event)
Marten->>EventStore: Append event
EventStore-->>Marten: Success
Marten-->>CommandHandler: Success
CommandHandler-->>Controller: Return ID
Controller-->>Client: 201 Created
Note over AsyncDaemon,ReadDB: Background Processing
EventStore->>AsyncDaemon: New event available
AsyncDaemon->>AsyncDaemon: Apply projection
AsyncDaemon->>ReadDB: Update read model
ReadDB-->>AsyncDaemon: Updated
Note over Client,ReadDB: Read Operation
Client->>Controller: GET /api/blogposts
Controller->>MediatR: Send Query
MediatR->>QueryHandler: Handle query
QueryHandler->>ReadDB: SELECT with Dapper
ReadDB-->>QueryHandler: Return data
QueryHandler-->>Controller: Return DTOs
Controller-->>Client: 200 OK
阅读模型和预测
预测将事件变成变幻莫测的阅读模型:
builder.Services.AddMarten(options =>
{
// This projection runs synchronously
options.Projections.Add<CriticalDataProjection>(ProjectionLifecycle.Inline);
// This projection runs async
options.Projections.Add<BlogPostProjection>(ProjectionLifecycle.Async);
});
Marten 的 Async 守护进程在背景中处理事件, 并不断更新阅读模型 。
var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(id);
与 Dapper 的查询边
使用 MeditR, 控制器非常简单 :
// Command handler
public class CreateBlogPostHandler : IRequestHandler<CreateBlogPostCommand, int>
{
private readonly ApplicationDbContext _context;
private readonly IMemoryCache _cache;
public async Task<int> Handle(CreateBlogPostCommand request, CancellationToken cancellationToken)
{
var blogPost = new BlogPost
{
Title = request.Title,
Content = request.Content,
AuthorId = request.AuthorId,
PublishedDate = DateTime.UtcNow
};
_context.BlogPosts.Add(blogPost);
await _context.SaveChangesAsync(cancellationToken);
// Manual cache invalidation - this is the tedious bit
_cache.Remove("recent-posts");
_cache.Remove($"author-posts-{request.AuthorId}");
_cache.Remove($"post-{blogPost.Id}");
return blogPost.Id;
}
}
// Query handler
public class GetRecentPostsHandler : IRequestHandler<GetRecentPostsQuery, List<BlogPostDto>>
{
private readonly string _connectionString;
private readonly IMemoryCache _cache;
public async Task<List<BlogPostDto>> Handle(GetRecentPostsQuery request, CancellationToken cancellationToken)
{
var cacheKey = "recent-posts";
if (_cache.TryGetValue<List<BlogPostDto>>(cacheKey, out var cached))
return cached!;
// Cache miss - query with Dapper
using var connection = new NpgsqlConnection(_connectionString);
var posts = (await connection.QueryAsync<BlogPostDto>(
"SELECT id, title, author_name, published_date FROM blog_posts ORDER BY published_date DESC LIMIT 10"
)).ToList();
_cache.Set(cacheKey, posts, TimeSpan.FromMinutes(5));
return posts;
}
}
**第2部分: " 半处理法 " ( " 无效处理 " )**对,所以你读到过 有关适当事件 搜捕,你正在想,"这是很多的工作。"
够公平了这是“非正式的CQRS”方法, 大部分团队都实际使用。
设置将命令写入数据库( 使用 EF 或 Dapper)
从 I MemoryCache 或 I 分布缓存中查询写入后命令将相关缓存条目无效
没有事件来源, 没有预测, 没有同步守护进程这不是真实的CQRS。
你得不到审计线索 时间查询 或自动预测
快速示例
简单应用程序,没有复杂的审计要求
您需要性能,但无法证明您需要性能 但无法证明您需要性能
几秒钟的腐烂是可以接受的问题
缓存无效是困难的:错过一个缓存密钥,你就提供老化的数据。
**每个命令都需要知道哪些缓存可以失效 。**无审计线索
**:你只有现在的状态。**无法证明发生的事和时间
无时间查询:不能问 "昨天的系统长什么样?"
:用 I MemoryCache ,每个服务器都有自己的缓存。
更改查询, 可能会断开命令 。
第3部分:最坏想法 -- -- 事件抛浮+手工遗漏
// Don't do this!
public class PublishBlogPostHandler : IRequestHandler<PublishBlogPostCommand>
{
private readonly IDocumentSession _session;
private readonly IMemoryCache _cache; // ← BAD
public async Task Handle(PublishBlogPostCommand request, CancellationToken cancellationToken)
{
var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(request.BlogPostId);
var published = blogPost.Publish();
_session.Events.Append(request.BlogPostId, published);
await _session.SaveChangesAsync(cancellationToken);
// Manually invalidating cache while using Event Sourcing ← TERRIBLE IDEA
_cache.Remove($"post-{request.BlogPostId}");
_cache.Remove("recent-posts");
// Now you have:
// 1. Event in event store
// 2. Cache invalidated
// 3. But projection hasn't run yet!
// Queries will hit database before projection completes = stale data
}
}
为什么人们尝试这个
所以他们认为"我只增加缓存失效功能 让阅读速度更快 更加一致"
为何如此可怕
您现在有两个系统 保持阅读模型同步 Marten的预测 以及您的手动缓存失效
他们失去同步了
调试恶梦
正确方法
使用 Marten 的预测( 同步或内联)
直接查询阅读模型接受最终的一致性(通常罚款)如果您真的需要即时一致性, 请使用内线预测
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.