现代 CQRS 和 .NET 中的事件: 正确操作 (中文 (Chinese Simplified))

现代 CQRS 和 .NET 中的事件: 正确操作

Monday, 13 January 2025

//

13 minute read

注 注 注 注 注注:这是我忘记发行的旧文章。

在这里,享受吧!

我更新了它,但可能是我错过了一些问题。

CQRS 和 Enterprises Socucing -- -- 两种模式经常被一起提及,但常常被误解。

使用现代的.NET工具来正确执行这些工具: Marten 用于事件来源, Dapper 用于最佳查询,MediatR 用于组织一切。

  • **并解释为何试图将事件与手动缓冲失效混为一谈,**一. 导言 导言 导言 导言 导言 导言 一,导言 导言 导言 导言 导言 导言

  • **CQRS(Command Query Responsibility Socureation)和事件追赶是两种特别行之有效的不同模式:**CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CQRS CRS CQRS CQRS CQRS CQRS CQRS CRS CQRS CQRS CQRS CQRS CQRS CQRS CC CQRS CQRS CC CQRS CC CQRS CC CQRS CC CQRS CC CQRS C CQRS C CC CQRS CC CQRS

  • 将你的阅读模型与你的写模型分开

  • 事件时间间隔

    • 将每个州的变化 都作为一连串事件
  • 如果用像Marten这样的工具做对了, 你会得到:

  • 每项变动的完整审计跟踪

在任何时候重建国家的能力自动读取模型预测自然适合域驱动设计

本条的重点是这样做

适当

// Write Model - Commands that change state
public record CreateBlogPostCommand(string Title, string Content, string AuthorId);

// Read Model - DTOs optimised for display
public class BlogPostListItemDto
{
    public Guid Id { get; set; }
    public string Title { get; set; }
    public string AuthorName { get; set; }
    public DateTime PublishedDate { get; set; }
    public int CommentCount { get; set; }
}

如果你想要快速和脏兮兮的缓冲法的话, 我会在结尾处简短地说一说, 但其实不是CQRS, 而且它不会给您带来 事件保温的好处。

什么是CQRS?

// Traditional: Store current state
public class BlogPost
{
    public Guid Id { get; set; }
    public string Title { get; set; }  // Current title
    public bool IsPublished { get; set; }  // Current status
}

// Event Sourcing: Store the events
public record BlogPostCreated(Guid Id, string Title, string Content, DateTime CreatedAt);
public record BlogPostTitleChanged(Guid Id, string OldTitle, string NewTitle, DateTime ChangedAt);
public record BlogPostPublished(Guid Id, DateTime PublishedAt);

CQRS的核心是使用不同的读和写数据模式:

书写部分侧重于业务逻辑和验证。

**读取面被拆分, 并优化显示 。**很简单,但真正的CQRS 意味着它们是完全分开的路径 通过你的应用程序。

**什么事件是苏奎宁?**您不保存当前状态, 而是保存导致该状态的事件 :

**当前状态通过重播事件产生。**这给了你一个完整的历史 记录你系统中发生的一切

**为什么使用 CQRS 事件缓冲 ?**完整的审计拖车

**:每个变化都有记录。**完全适合财务系统、医疗 或任何你需要证明 发生的事情和时间的地方

时间查询

**”这篇博客文章与上周二有什么相似之处?**除调调

**:通过重播造成虫子的事件的确切顺序来复制虫子。**商业情报

**:根据历史数据编制新的报告,而不进行迁移。**这些事件已经存在。

CQRS 适合自然自然: 自然而然地将事件(附录事件)与读物(询问预测)分开写作(附录事件)

当你不应该

简单 CRUUD 简单 CRUUD:如果你只是存储和检索数据, 事件抛锚是荒谬的间接费用。缺乏经验的小型团队

学习曲线是陡峭的。

  • 无审计要求
  • :如果你只关心现状, 不要保存历史 。
  • 大型二进制数据
  • :活动在图像、视频、文件方面效果不佳。
  • 现代事件 .NET: Marten

2025年在.NET .NET 中进行事件搜索的最佳工具是

马尔坦

dotnet add package Marten
dotnet add package Marten.AspNetCore

Program.cs:

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMarten(options =>
{
    options.Connection(builder.Configuration.GetConnectionString("Marten")!);

    // Register event types
    options.Events.AddEventType<BlogPostCreated>();
    options.Events.AddEventType<BlogPostPublished>();
    options.Events.AddEventType<BlogPostTitleChanged>();
    options.Events.AddEventType<CommentAdded>();

    // Configure async projections
    options.Projections.Add<BlogPostProjection>(ProjectionLifecycle.Async);
});

builder.Services.AddMediatR(cfg =>
    cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));

var app = builder.Build();

这是一家以PostgreSQL为基础的活动商店, 由Jeremy Miller积极维护, 它只是工作而已。

为什么是Marten?

flowchart TB
    subgraph Client["Client Application"]
        UI[User Interface]
    end

    subgraph Commands["Command Side (Writes)"]
        CMD[Commands] --> CMDH[Command Handlers]
        CMDH --> MARTEN[Marten Session]
        MARTEN --> EVENTS[(Event Store)]
    end

    subgraph Background["Async Processing"]
        EVENTS -.->|Event Stream| DAEMON[Marten Async Daemon]
        DAEMON --> PROJ[Projections]
        PROJ --> READDB[(Read Models)]
    end

    subgraph Queries["Query Side (Reads)"]
        QRY[Queries] --> QRYH[Query Handlers with Dapper]
        QRYH --> READDB
    end

    UI -->|Commands| CMD
    UI -->|Queries| QRY

    classDef commandStyle fill:none,stroke:#e63946,stroke-width:3px
    classDef queryStyle fill:none,stroke:#457b9d,stroke-width:3px
    classDef dataStyle fill:none,stroke:#2a9d8f,stroke-width:3px

    class CMD,CMDH,MARTEN commandStyle
    class QRY,QRYH queryStyle
    class EVENTS,READDB dataStyle

建于 PostgreSQL (你已经知道)

  • 从事件到阅读模型的自动预测用于投影处理的 Async 守护程序
  • 丰富查询能力生产准备和作战试验
  • 设置Marten安装软件包 :
  • 配置配置在

建筑结构

这是所有事情的搭配方式:

// Always past tense - these things have happened
public record BlogPostCreated(
    Guid BlogPostId,
    string Title,
    string Content,
    string AuthorId,
    DateTime CreatedAt
);

public record BlogPostPublished(
    Guid BlogPostId,
    DateTime PublishedAt
);

public record BlogPostTitleChanged(
    Guid BlogPostId,
    string OldTitle,
    string NewTitle,
    DateTime ChangedAt
);

public record CommentAdded(
    Guid BlogPostId,
    Guid CommentId,
    string Author,
    string Content,
    DateTime CreatedAt
);

要点:

  • 命令事件存储的附加事件
  • Async 守护守护程序读取模型
  • 查询读取自非正常读取模型

不需要手动缓存失效 !

界定事件

public class BlogPost
{
    // Marten requires an Id property
    public Guid Id { get; set; }

    // Current state (private setters)
    public string Title { get; private set; } = string.Empty;
    public string Content { get; private set; } = string.Empty;
    public string AuthorId { get; private set; } = string.Empty;
    public bool IsPublished { get; private set; }
    public DateTime? PublishedDate { get; private set; }
    private readonly List<Comment> _comments = new();
    public IReadOnlyList<Comment> Comments => _comments.AsReadOnly();

    // Apply methods - called by Marten when replaying events
    public void Apply(BlogPostCreated e)
    {
        Id = e.BlogPostId;
        Title = e.Title;
        Content = e.Content;
        AuthorId = e.AuthorId;
    }

    public void Apply(BlogPostPublished e)
    {
        IsPublished = true;
        PublishedDate = e.PublishedAt;
    }

    public void Apply(BlogPostTitleChanged e)
    {
        Title = e.NewTitle;
    }

    public void Apply(CommentAdded e)
    {
        _comments.Add(new Comment
        {
            Id = e.CommentId,
            Author = e.Author,
            Content = e.Content,
            CreatedAt = e.CreatedAt
        });
    }

    // Business logic methods that produce events
    public static BlogPostCreated Create(string title, string content, string authorId)
    {
        if (string.IsNullOrWhiteSpace(title))
            throw new ArgumentException("Title is required");

        return new BlogPostCreated(
            Guid.NewGuid(),
            title,
            content,
            authorId,
            DateTime.UtcNow
        );
    }

    public BlogPostPublished Publish()
    {
        if (IsPublished)
            throw new InvalidOperationException("Post is already published");

        return new BlogPostPublished(Id, DateTime.UtcNow);
    }

    public BlogPostTitleChanged ChangeTitle(string newTitle)
    {
        if (string.IsNullOrWhiteSpace(newTitle))
            throw new ArgumentException("Title cannot be empty");

        if (newTitle == Title)
            throw new InvalidOperationException("New title is the same as current title");

        return new BlogPostTitleChanged(Id, Title, newTitle, DateTime.UtcNow);
    }
}

public class Comment
{
    public Guid Id { get; set; }
    public string Author { get; set; } = string.Empty;
    public string Content { get; set; } = string.Empty;
    public DateTime CreatedAt { get; set; }
}

事件是无法改变的记录,描述所发生的事情:

  1. 活动应该是:
  2. 不易变
    • 写过一次,从未改变过

过去时态

  • 描述发生的事情
// Define commands
public record CreateBlogPostCommand(
    string Title,
    string Content,
    string AuthorId
) : IRequest<Guid>;

public record PublishBlogPostCommand(Guid BlogPostId) : IRequest;

public record ChangeBlogPostTitleCommand(
    Guid BlogPostId,
    string NewTitle
) : IRequest;

// Handler for creating a blog post
public class CreateBlogPostHandler : IRequestHandler<CreateBlogPostCommand, Guid>
{
    private readonly IDocumentSession _session;

    public CreateBlogPostHandler(IDocumentSession session)
    {
        _session = session;
    }

    public async Task<Guid> Handle(CreateBlogPostCommand request, CancellationToken cancellationToken)
    {
        // Create the event
        var created = BlogPost.Create(
            request.Title,
            request.Content,
            request.AuthorId
        );

        // Start a new event stream
        _session.Events.StartStream<BlogPost>(created.BlogPostId, created);

        await _session.SaveChangesAsync(cancellationToken);

        return created.BlogPostId;
    }
}

// Handler for publishing
public class PublishBlogPostHandler : IRequestHandler<PublishBlogPostCommand>
{
    private readonly IDocumentSession _session;

    public PublishBlogPostHandler(IDocumentSession session)
    {
        _session = session;
    }

    public async Task Handle(PublishBlogPostCommand request, CancellationToken cancellationToken)
    {
        // Load the aggregate by replaying its events
        var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(
            request.BlogPostId,
            token: cancellationToken
        );

        if (blogPost == null)
            throw new InvalidOperationException($"Blog post {request.BlogPostId} not found");

        // Business logic produces new event
        var published = blogPost.Publish();

        // Append event to the stream
        _session.Events.Append(request.BlogPostId, published);

        await _session.SaveChangesAsync(cancellationToken);
    }
}

// Handler for changing title
public class ChangeBlogPostTitleHandler : IRequestHandler<ChangeBlogPostTitleCommand>
{
    private readonly IDocumentSession _session;

    public ChangeBlogPostTitleHandler(IDocumentSession session)
    {
        _session = session;
    }

    public async Task Handle(ChangeBlogPostTitleCommand request, CancellationToken cancellationToken)
    {
        var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(
            request.BlogPostId,
            token: cancellationToken
        );

        if (blogPost == null)
            throw new InvalidOperationException($"Blog post {request.BlogPostId} not found");

        var titleChanged = blogPost.ChangeTitle(request.NewTitle);

        _session.Events.Append(request.BlogPostId, titleChanged);

        await _session.SaveChangesAsync(cancellationToken);
    }
}

商业意义丰富

    • "Blog Posttitle changed"不是"财产更新"
  1. 创建总计
  2. 总计是写作模型。
  3. 它们验证业务规则并举办活动:

模式 :

业务方法验证和返回活动

应用方法更新内部状态

// Read model - optimised for queries
public class BlogPostReadModel
{
    public Guid Id { get; set; }
    public string Title { get; set; } = string.Empty;
    public string Content { get; set; } = string.Empty;
    public string AuthorId { get; set; } = string.Empty;
    public DateTime CreatedAt { get; set; }
    public DateTime? PublishedAt { get; set; }
    public bool IsPublished { get; set; }
    public int CommentCount { get; set; }
}

// Projection - tells Marten how to build read models from events
public class BlogPostProjection : MultiStreamProjection<BlogPostReadModel, Guid>
{
    public BlogPostProjection()
    {
        // Identity tells Marten which stream each event belongs to
        Identity<BlogPostCreated>(x => x.BlogPostId);
        Identity<BlogPostPublished>(x => x.BlogPostId);
        Identity<BlogPostTitleChanged>(x => x.BlogPostId);
        Identity<CommentAdded>(x => x.BlogPostId);
    }

    // Apply methods - Marten calls these to update read models
    public void Apply(BlogPostReadModel view, BlogPostCreated e)
    {
        view.Id = e.BlogPostId;
        view.Title = e.Title;
        view.Content = e.Content;
        view.AuthorId = e.AuthorId;
        view.CreatedAt = e.CreatedAt;
        view.IsPublished = false;
    }

    public void Apply(BlogPostReadModel view, BlogPostPublished e)
    {
        view.IsPublished = true;
        view.PublishedAt = e.PublishedAt;
    }

    public void Apply(BlogPostReadModel view, BlogPostTitleChanged e)
    {
        view.Title = e.NewTitle;
    }

    public void Apply(BlogPostReadModel view, CommentAdded e)
    {
        view.CommentCount++;
    }
}

Marten 处理事件持续和重放

命令处理器( write 侧)

将事件附加到流中 :

// Define queries
public record GetRecentBlogPostsQuery(
    int Count,
    bool PublishedOnly
) : IRequest<List<BlogPostListItemDto>>;

public record GetBlogPostByIdQuery(Guid Id) : IRequest<BlogPostDetailDto?>;

// DTOs for display
public class BlogPostListItemDto
{
    public Guid Id { get; set; }
    public string Title { get; set; } = string.Empty;
    public string AuthorName { get; set; } = string.Empty;
    public DateTime CreatedAt { get; set; }
    public DateTime? PublishedAt { get; set; }
    public int CommentCount { get; set; }
    public bool IsPublished { get; set; }
}

public class BlogPostDetailDto
{
    public Guid Id { get; set; }
    public string Title { get; set; } = string.Empty;
    public string Content { get; set; } = string.Empty;
    public string AuthorId { get; set; } = string.Empty;
    public string AuthorName { get; set; } = string.Empty;
    public DateTime CreatedAt { get; set; }
    public DateTime? PublishedAt { get; set; }
    public bool IsPublished { get; set; }
    public List<CommentDto> Comments { get; set; } = new();
}

public class CommentDto
{
    public Guid Id { get; set; }
    public string Author { get; set; } = string.Empty;
    public string Content { get; set; } = string.Empty;
    public DateTime CreatedAt { get; set; }
}

// Query handlers
public class GetRecentBlogPostsHandler : IRequestHandler<GetRecentBlogPostsQuery, List<BlogPostListItemDto>>
{
    private readonly string _connectionString;

    public GetRecentBlogPostsHandler(IConfiguration config)
    {
        _connectionString = config.GetConnectionString("Marten")!;
    }

    public async Task<List<BlogPostListItemDto>> Handle(
        GetRecentBlogPostsQuery request,
        CancellationToken cancellationToken)
    {
        await using var connection = new NpgsqlConnection(_connectionString);

        // Query the Marten-generated read model table
        const string sql = @"
            SELECT
                bp.id AS Id,
                bp.title AS Title,
                u.name AS AuthorName,
                bp.created_at AS CreatedAt,
                bp.published_at AS PublishedAt,
                bp.comment_count AS CommentCount,
                bp.is_published AS IsPublished
            FROM blog_post_read_models bp
            LEFT JOIN users u ON bp.author_id = u.id
            WHERE (@PublishedOnly = false OR bp.is_published = true)
            ORDER BY
                CASE WHEN bp.is_published THEN bp.published_at
                     ELSE bp.created_at
                END DESC
            LIMIT @Count";

        var results = await connection.QueryAsync<BlogPostListItemDto>(
            sql,
            new
            {
                PublishedOnly = request.PublishedOnly,
                Count = request.Count
            });

        return results.ToList();
    }
}

public class GetBlogPostByIdHandler : IRequestHandler<GetBlogPostByIdQuery, BlogPostDetailDto?>
{
    private readonly string _connectionString;

    public GetBlogPostByIdHandler(IConfiguration config)
    {
        _connectionString = config.GetConnectionString("Marten")!;
    }

    public async Task<BlogPostDetailDto?> Handle(
        GetBlogPostByIdQuery request,
        CancellationToken cancellationToken)
    {
        await using var connection = new NpgsqlConnection(_connectionString);

        const string sql = @"
            SELECT
                bp.id AS Id,
                bp.title AS Title,
                bp.content AS Content,
                bp.author_id AS AuthorId,
                u.name AS AuthorName,
                bp.created_at AS CreatedAt,
                bp.published_at AS PublishedAt,
                bp.is_published AS IsPublished
            FROM blog_post_read_models bp
            LEFT JOIN users u ON bp.author_id = u.id
            WHERE bp.id = @Id";

        var post = await connection.QuerySingleOrDefaultAsync<BlogPostDetailDto>(
            sql,
            new { request.Id });

        if (post == null)
            return null;

        // Get comments from event stream if needed
        // Or maintain a separate comment read model

        return post;
    }
}

流量 :

通过重放事件(或创建新事件)加载总和

呼叫业务方法(验证和返回事件)

[ApiController]
[Route("api/[controller]")]
public class BlogPostsController : ControllerBase
{
    private readonly IMediator _mediator;

    public BlogPostsController(IMediator mediator)
    {
        _mediator = mediator;
    }

    [HttpGet]
    public async Task<ActionResult<List<BlogPostListItemDto>>> GetRecent(
        [FromQuery] int count = 10,
        [FromQuery] bool publishedOnly = true)
    {
        var query = new GetRecentBlogPostsQuery(count, publishedOnly);
        var results = await _mediator.Send(query);
        return Ok(results);
    }

    [HttpGet("{id}")]
    public async Task<ActionResult<BlogPostDetailDto>> GetById(Guid id)
    {
        var query = new GetBlogPostByIdQuery(id);
        var result = await _mediator.Send(query);

        if (result == null)
            return NotFound();

        return Ok(result);
    }

    [HttpPost]
    public async Task<ActionResult<Guid>> Create([FromBody] CreateBlogPostCommand command)
    {
        var postId = await _mediator.Send(command);
        return CreatedAtAction(nameof(GetById), new { id = postId }, postId);
    }

    [HttpPost("{id}/publish")]
    public async Task<ActionResult> Publish(Guid id)
    {
        await _mediator.Send(new PublishBlogPostCommand(id));
        return NoContent();
    }

    [HttpPut("{id}/title")]
    public async Task<ActionResult> ChangeTitle(
        Guid id,
        [FromBody] ChangeBlogPostTitleCommand command)
    {
        if (id != command.BlogPostId)
            return BadRequest();

        await _mediator.Send(command);
        return NoContent();
    }
}

将事件附加到流流

保存更改

sequenceDiagram
    participant Client
    participant Controller
    participant MediatR
    participant CommandHandler
    participant Marten
    participant EventStore
    participant AsyncDaemon
    participant ReadDB
    participant QueryHandler

    Note over Client,ReadDB: Write Operation
    Client->>Controller: POST /api/blogposts
    Controller->>MediatR: Send CreateBlogPostCommand
    MediatR->>CommandHandler: Handle command
    CommandHandler->>CommandHandler: Validate & create event
    CommandHandler->>Marten: StartStream(event)
    Marten->>EventStore: Append event
    EventStore-->>Marten: Success
    Marten-->>CommandHandler: Success
    CommandHandler-->>Controller: Return ID
    Controller-->>Client: 201 Created

    Note over AsyncDaemon,ReadDB: Background Processing
    EventStore->>AsyncDaemon: New event available
    AsyncDaemon->>AsyncDaemon: Apply projection
    AsyncDaemon->>ReadDB: Update read model
    ReadDB-->>AsyncDaemon: Updated

    Note over Client,ReadDB: Read Operation
    Client->>Controller: GET /api/blogposts
    Controller->>MediatR: Send Query
    MediatR->>QueryHandler: Handle query
    QueryHandler->>ReadDB: SELECT with Dapper
    ReadDB-->>QueryHandler: Return data
    QueryHandler-->>Controller: Return DTOs
    Controller-->>Client: 200 OK

Marten负责其他的 - 储存事件,触发预测,等等。

阅读模型和预测

预测将事件变成变幻莫测的阅读模型:

builder.Services.AddMarten(options =>
{
    // This projection runs synchronously
    options.Projections.Add<CriticalDataProjection>(ProjectionLifecycle.Inline);

    // This projection runs async
    options.Projections.Add<BlogPostProjection>(ProjectionLifecycle.Async);
});

Marten 的 Async 守护进程在背景中处理事件, 并不断更新阅读模型 。

var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(id);

你不写任何缓存失效代码 - 这是自动的

与 Dapper 的查询边

现在我们使用 Dapper 来查询读取模型的最大性能 :

  • 没有缓存代码 。
  • 无效逻辑。
  • Marten让读数模型自动同步
  • 主计长

使用 MeditR, 控制器非常简单 :

完整的流动

// Command handler
public class CreateBlogPostHandler : IRequestHandler<CreateBlogPostCommand, int>
{
    private readonly ApplicationDbContext _context;
    private readonly IMemoryCache _cache;

    public async Task<int> Handle(CreateBlogPostCommand request, CancellationToken cancellationToken)
    {
        var blogPost = new BlogPost
        {
            Title = request.Title,
            Content = request.Content,
            AuthorId = request.AuthorId,
            PublishedDate = DateTime.UtcNow
        };

        _context.BlogPosts.Add(blogPost);
        await _context.SaveChangesAsync(cancellationToken);

        // Manual cache invalidation - this is the tedious bit
        _cache.Remove("recent-posts");
        _cache.Remove($"author-posts-{request.AuthorId}");
        _cache.Remove($"post-{blogPost.Id}");

        return blogPost.Id;
    }
}

// Query handler
public class GetRecentPostsHandler : IRequestHandler<GetRecentPostsQuery, List<BlogPostDto>>
{
    private readonly string _connectionString;
    private readonly IMemoryCache _cache;

    public async Task<List<BlogPostDto>> Handle(GetRecentPostsQuery request, CancellationToken cancellationToken)
    {
        var cacheKey = "recent-posts";

        if (_cache.TryGetValue<List<BlogPostDto>>(cacheKey, out var cached))
            return cached!;

        // Cache miss - query with Dapper
        using var connection = new NpgsqlConnection(_connectionString);
        var posts = (await connection.QueryAsync<BlogPostDto>(
            "SELECT id, title, author_name, published_date FROM blog_posts ORDER BY published_date DESC LIMIT 10"
        )).ToList();

        _cache.Set(cacheKey, posts, TimeSpan.FromMinutes(5));
        return posts;
    }
}

以下是所有这一切是如何一起工作的:

  • 最终一致性
  • 需要理解的一件事:随着同步预测, 写一个事件和阅读模型更新之间 稍有延迟。
  • 通常为毫秒,但就在那里
  • 如果具体操作需要立即保持一致,请使用内线预测:

或者直接查询事件流, 用于读写后的设想方案 :

**第2部分: " 半处理法 " ( " 无效处理 " )**对,所以你读到过 有关适当事件 搜捕,你正在想,"这是很多的工作。"

够公平了这是“非正式的CQRS”方法, 大部分团队都实际使用。

设置将命令写入数据库( 使用 EF 或 Dapper)

从 I MemoryCache 或 I 分布缓存中查询写入后命令将相关缓存条目无效

没有事件来源, 没有预测, 没有同步守护进程这不是真实的CQRS。

你得不到审计线索 时间查询 或自动预测

但你的性能却不那么复杂

快速示例

当此工程完成时

简单应用程序,没有复杂的审计要求

您需要性能,但无法证明您需要性能 但无法证明您需要性能

不想学习的小型团队

几秒钟的腐烂是可以接受的问题

缓存无效是困难的:错过一个缓存密钥,你就提供老化的数据。

**每个命令都需要知道哪些缓存可以失效 。**无审计线索

**:你只有现在的状态。**无法证明发生的事和时间

无时间查询:不能问 "昨天的系统长什么样?"

缩缩缩

:用 I MemoryCache ,每个服务器都有自己的缓存。

  • 在服务器 A 上更新, 服务器 B 仍然有老化的数据 。
  • IdispulteCache 解决这个问题, 但添加 Redis 。
  • 紧密连接
  • :命令与缓存密钥连接。

更改查询, 可能会断开命令 。

  • 管用,但你正在用简单的交易 来弥补失去的能力
  • 有时候,这是正确的权衡。
  • 经常不是

第3部分:最坏想法 -- -- 事件抛浮+手工遗漏

现在,真正可怕的想法是: 使用事件递解, 但也试图手动 无效的缓存 。

// Don't do this!
public class PublishBlogPostHandler : IRequestHandler<PublishBlogPostCommand>
{
    private readonly IDocumentSession _session;
    private readonly IMemoryCache _cache;  // ← BAD

    public async Task Handle(PublishBlogPostCommand request, CancellationToken cancellationToken)
    {
        var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(request.BlogPostId);
        var published = blogPost.Publish();

        _session.Events.Append(request.BlogPostId, published);
        await _session.SaveChangesAsync(cancellationToken);

        // Manually invalidating cache while using Event Sourcing ← TERRIBLE IDEA
        _cache.Remove($"post-{request.BlogPostId}");
        _cache.Remove("recent-posts");

        // Now you have:
        // 1. Event in event store
        // 2. Cache invalidated
        // 3. But projection hasn't run yet!
        // Queries will hit database before projection completes = stale data
    }
}

为什么人们尝试这个

他们想要Enter Sourcing的审计线索和时间查询 但他们担心最终的一致性

所以他们认为"我只增加缓存失效功能 让阅读速度更快 更加一致"

不要这样做。

为何如此可怕

  • 你辜负了目的
  • :通过预测已经建立的阅读模型。
  • 添加手动缓存无效表示您正在绕过系统 。
  • 双倍复杂复杂程度

您现在有两个系统 保持阅读模型同步 Marten的预测 以及您的手动缓存失效

  • 他们将发生冲突。
  • 不一致状态
  • : Marten 的 Async 守护进程更新数据库 。

您的缓存无效立即运行 。

他们失去同步了

  • 这是正确的吗?
  • 损失的养恤金
  • : 事件Sourcing的要点是,预测来自事件。
  • 手动缓存失效中断该模型 。

调试恶梦

  • :老化的数据是不是因为预测没有运行?
  • 还是因为你忘了取消一个缓存密钥?
  • 还是因为缓存失效 但投影尚未运行?

祝你好运

正确方法

  • 如果您需要“事件保护 ” :

使用 Marten 的预测( 同步或内联)

直接查询阅读模型接受最终的一致性(通常罚款)如果您真的需要即时一致性, 请使用内线预测

如果你无法接受最终的一致性:

Finding related posts...
logo

© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.