using Dpz.Core.MessageQueue.Models;

namespace Dpz.Core.MessageQueue.Abstractions;

/// <summary>
/// 消息 Outbox 存储接口,用于持久化消息投递状态,实现发布和消费的数据库兜底。
/// </summary>
public interface IMessageOutboxStore
{
    /// <summary>
    /// 创建一条 Outbox 记录,在消息发布前调用,初始状态为"待发布"。
    /// </summary>
    /// <param name="messageId">消息唯一标识</param>
    /// <param name="messageType">消息类型的完全限定名</param>
    /// <param name="exchange">目标 Exchange 名称</param>
    /// <param name="routingKey">路由键</param>
    /// <param name="payload">JSON 序列化的消息体</param>
    /// <param name="source">消息来源标识,可为空</param>
    /// <param name="cancellationToken">取消令牌</param>
    Task CreateAsync(
        string messageId,
        string messageType,
        string exchange,
        string routingKey,
        string payload,
        string? source,
        CancellationToken cancellationToken = default
    );

    /// <summary>
    /// 将消息标记为已成功发布到 RabbitMQ,状态变为"已发布",并记录发布时间。
    /// </summary>
    /// <param name="messageId">消息唯一标识</param>
    /// <param name="cancellationToken">取消令牌</param>
    Task MarkSentAsync(string messageId, CancellationToken cancellationToken = default);

    /// <summary>
    /// 将消息标记为发布失败,累加发布次数并按指数退避计算下次重试时间,状态变为"发布失败"。
    /// </summary>
    /// <param name="messageId">消息唯一标识</param>
    /// <param name="error">失败原因描述</param>
    /// <param name="cancellationToken">取消令牌</param>
    Task MarkPublishFailedAsync(
        string messageId,
        string error,
        CancellationToken cancellationToken = default
    );

    /// <summary>
    /// 将消息标记为已成功消费,状态变为"已消费",并记录消费时间。
    /// </summary>
    /// <param name="messageId">消息唯一标识</param>
    /// <param name="cancellationToken">取消令牌</param>
    Task MarkConsumedAsync(string messageId, CancellationToken cancellationToken = default);

    /// <summary>
    /// 将消息标记为消费失败,累加消费次数并按指数退避计算下次重试时间,状态变为"消费失败"。
    /// </summary>
    /// <param name="messageId">消息唯一标识</param>
    /// <param name="error">失败原因描述</param>
    /// <param name="cancellationToken">取消令牌</param>
    Task MarkConsumeFailedAsync(
        string messageId,
        string error,
        CancellationToken cancellationToken = default
    );

    /// <summary>
    /// 获取当前可进行发布重试的 Outbox 条目(状态为"发布失败"且退避时间已到)。
    /// </summary>
    /// <param name="batchSize">每批最大条数</param>
    /// <param name="cancellationToken">取消令牌</param>
    /// <returns>待重试的 Outbox 条目列表</returns>
    Task<IReadOnlyList<MessageOutboxEntry>> GetPendingPublishRetryAsync(
        int batchSize,
        CancellationToken cancellationToken = default
    );

    /// <summary>
    /// 获取当前可进行消费重试的 Outbox 条目(状态为"消费失败"且退避时间已到)。
    /// </summary>
    /// <param name="batchSize">每批最大条数</param>
    /// <param name="cancellationToken">取消令牌</param>
    /// <returns>待重试的 Outbox 条目列表</returns>
    Task<IReadOnlyList<MessageOutboxEntry>> GetPendingConsumeRetryAsync(
        int batchSize,
        CancellationToken cancellationToken = default
    );
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个消息Outbox模式的存储接口定义,用于实现可靠的消息传递机制。让我详细解释一下:

核心概念

Outbox模式 是一种确保分布式系统中消息可靠传递的设计模式,通过数据库持久化消息状态来解决消息丢失问题。

接口功能分析

1. 消息生命周期管理

// 消息发布流程
CreateAsync() → MarkSentAsync() / MarkPublishFailedAsync()
                      ↓
// 消息消费流程  
MarkConsumedAsync() / MarkConsumeFailedAsync()

2. 主要方法说明

创建消息记录

CreateAsync() // 消息发布前创建记录,状态为"待发布"

发布状态管理

MarkSentAsync()          // 标记为"已发布"
MarkPublishFailedAsync() // 标记为"发布失败",支持重试机制

消费状态管理

MarkConsumedAsync()      // 标记为"已消费"
MarkConsumeFailedAsync() // 标记为"消费失败",支持重试机制

重试机制

GetPendingPublishRetryAsync() // 获取待发布重试的消息
GetPendingConsumeRetryAsync() // 获取待消费重试的消息

设计特点

  1. 可靠性保障:通过数据库持久化确保消息不丢失
  2. 重试机制:失败消息支持指数退避重试
  3. 状态跟踪:完整的消息生命周期状态管理
  4. 异步操作:所有方法都是异步的,支持高并发
  5. 批量处理:重试查询支持批量操作,提高性能

使用场景

  • 微服务间通信:确保服务间消息可靠传递
  • 事务性消息:与业务事务绑定的消息发送
  • 消息补偿:失败消息的自动重试和补偿
  • 审计追踪:完整的消息处理历史记录

这个接口是构建可靠消息系统的核心抽象,实现了企业级应用中常见的消息可靠性要求。

评论加载中...