using Dpz.Core.MessageQueue.Models;
namespace Dpz.Core.MessageQueue.Abstractions;
/// <summary>
/// 消息 Outbox 存储接口,用于持久化消息投递状态,实现发布和消费的数据库兜底。
/// </summary>
public interface IMessageOutboxStore
{
/// <summary>
/// 创建一条 Outbox 记录,在消息发布前调用,初始状态为"待发布"。
/// </summary>
/// <param name="messageId">消息唯一标识</param>
/// <param name="messageType">消息类型的完全限定名</param>
/// <param name="exchange">目标 Exchange 名称</param>
/// <param name="routingKey">路由键</param>
/// <param name="payload">JSON 序列化的消息体</param>
/// <param name="source">消息来源标识,可为空</param>
/// <param name="cancellationToken">取消令牌</param>
Task CreateAsync(
string messageId,
string messageType,
string exchange,
string routingKey,
string payload,
string? source,
CancellationToken cancellationToken = default
);
/// <summary>
/// 将消息标记为已成功发布到 RabbitMQ,状态变为"已发布",并记录发布时间。
/// </summary>
/// <param name="messageId">消息唯一标识</param>
/// <param name="cancellationToken">取消令牌</param>
Task MarkSentAsync(string messageId, CancellationToken cancellationToken = default);
/// <summary>
/// 将消息标记为发布失败,累加发布次数并按指数退避计算下次重试时间,状态变为"发布失败"。
/// </summary>
/// <param name="messageId">消息唯一标识</param>
/// <param name="error">失败原因描述</param>
/// <param name="cancellationToken">取消令牌</param>
Task MarkPublishFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken = default
);
/// <summary>
/// 将消息标记为已成功消费,状态变为"已消费",并记录消费时间。
/// </summary>
/// <param name="messageId">消息唯一标识</param>
/// <param name="cancellationToken">取消令牌</param>
Task MarkConsumedAsync(string messageId, CancellationToken cancellationToken = default);
/// <summary>
/// 将消息标记为消费失败,累加消费次数并按指数退避计算下次重试时间,状态变为"消费失败"。
/// </summary>
/// <param name="messageId">消息唯一标识</param>
/// <param name="error">失败原因描述</param>
/// <param name="cancellationToken">取消令牌</param>
Task MarkConsumeFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken = default
);
/// <summary>
/// 获取当前可进行发布重试的 Outbox 条目(状态为"发布失败"且退避时间已到)。
/// </summary>
/// <param name="batchSize">每批最大条数</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>待重试的 Outbox 条目列表</returns>
Task<IReadOnlyList<MessageOutboxEntry>> GetPendingPublishRetryAsync(
int batchSize,
CancellationToken cancellationToken = default
);
/// <summary>
/// 获取当前可进行消费重试的 Outbox 条目(状态为"消费失败"且退避时间已到)。
/// </summary>
/// <param name="batchSize">每批最大条数</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>待重试的 Outbox 条目列表</returns>
Task<IReadOnlyList<MessageOutboxEntry>> GetPendingConsumeRetryAsync(
int batchSize,
CancellationToken cancellationToken = default
);
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个消息Outbox模式的存储接口定义,用于实现可靠的消息传递机制。让我详细解释一下:
核心概念
Outbox模式 是一种确保分布式系统中消息可靠传递的设计模式,通过数据库持久化消息状态来解决消息丢失问题。
接口功能分析
1. 消息生命周期管理
// 消息发布流程
CreateAsync() → MarkSentAsync() / MarkPublishFailedAsync()
↓
// 消息消费流程
MarkConsumedAsync() / MarkConsumeFailedAsync()
2. 主要方法说明
创建消息记录
CreateAsync() // 消息发布前创建记录,状态为"待发布"
发布状态管理
MarkSentAsync() // 标记为"已发布"
MarkPublishFailedAsync() // 标记为"发布失败",支持重试机制
消费状态管理
MarkConsumedAsync() // 标记为"已消费"
MarkConsumeFailedAsync() // 标记为"消费失败",支持重试机制
重试机制
GetPendingPublishRetryAsync() // 获取待发布重试的消息
GetPendingConsumeRetryAsync() // 获取待消费重试的消息
设计特点
- 可靠性保障:通过数据库持久化确保消息不丢失
- 重试机制:失败消息支持指数退避重试
- 状态跟踪:完整的消息生命周期状态管理
- 异步操作:所有方法都是异步的,支持高并发
- 批量处理:重试查询支持批量操作,提高性能
使用场景
- 微服务间通信:确保服务间消息可靠传递
- 事务性消息:与业务事务绑定的消息发送
- 消息补偿:失败消息的自动重试和补偿
- 审计追踪:完整的消息处理历史记录
这个接口是构建可靠消息系统的核心抽象,实现了企业级应用中常见的消息可靠性要求。
评论加载中...