using Dpz.Core.MessageQueue.Models;
using Dpz.Core.Public.ViewModel.Request;
using Dpz.Core.Public.ViewModel.Response;

namespace Dpz.Core.Service.RepositoryServiceImpl;

public sealed class MongoMessageOutboxStore(
    IRepository<MessageOutboxRecord> repository,
    ILogger<MongoMessageOutboxStore> logger,
    IFusionCache fusionCache
) : AbstractCacheService(fusionCache), IMongoMessageOutboxStore
{
    private const int MaxBackoffMinutes = 60;

    protected override TimeSpan CacheDefaultExpiration => TimeSpan.FromHours(12);

    public async Task CreateAsync(
        string messageId,
        string messageType,
        string exchange,
        string routingKey,
        string payload,
        string? source,
        CancellationToken cancellationToken = default
    )
    {
        var now = DateTime.Now;
        var record = new MessageOutboxRecord
        {
            MessageId = messageId,
            MessageType = messageType,
            Exchange = exchange,
            RoutingKey = routingKey,
            Payload = payload,
            Source = source,
            Status = OutboxMessageStatus.Pending,
            CreateTime = now,
            LastUpdateTime = now,
        };
        await repository.InsertAsync(record, cancellationToken);
        await RemoveCacheAsync(nameof(GetFilterOptionsAsync), cancellationToken: cancellationToken);
    }

    public async Task MarkSentAsync(string messageId, CancellationToken cancellationToken = default)
    {
        var now = DateTime.Now;
        var update = Builders<MessageOutboxRecord>
            .Update.Set(x => x.Status, OutboxMessageStatus.Sent)
            .Set(x => x.SentAt, now)
            .Set(x => x.LastUpdateTime, now);
        await repository.UpdateAsync(x => x.MessageId == messageId, update, cancellationToken);
    }

    public async Task MarkPublishFailedAsync(
        string messageId,
        string error,
        CancellationToken cancellationToken = default
    )
    {
        var existing = await repository
            .SearchFor(Builders<MessageOutboxRecord>.Filter.Eq(x => x.MessageId, messageId))
            .FirstOrDefaultAsync(cancellationToken);

        if (existing == null)
        {
            logger.LogWarning("MarkPublishFailedAsync: 未找到 MessageId={MessageId}", messageId);
            return;
        }

        var now = DateTime.Now;
        var attempts = existing.PublishAttempts + 1;
        var backoffMinutes = (int)Math.Min(MaxBackoffMinutes, Math.Pow(2, attempts));
        var nextRetryAt = now.AddMinutes(backoffMinutes);

        var update = Builders<MessageOutboxRecord>
            .Update.Set(x => x.Status, OutboxMessageStatus.PublishFailed)
            .Set(x => x.PublishAttempts, attempts)
            .Set(x => x.LastPublishAttemptAt, now)
            .Set(x => x.NextPublishRetryAt, nextRetryAt)
            .Set(x => x.LastPublishError, error)
            .Set(x => x.LastUpdateTime, now);

        await repository.UpdateAsync(x => x.MessageId == messageId, update, cancellationToken);
    }

    public async Task MarkConsumedAsync(
        string messageId,
        CancellationToken cancellationToken = default
    )
    {
        var now = DateTime.Now;
        var update = Builders<MessageOutboxRecord>
            .Update.Set(x => x.Status, OutboxMessageStatus.Consumed)
            .Set(x => x.ConsumedAt, now)
            .Set(x => x.LastUpdateTime, now);
        await repository.UpdateAsync(x => x.MessageId == messageId, update, cancellationToken);
    }

    public async Task MarkConsumeFailedAsync(
        string messageId,
        string error,
        CancellationToken cancellationToken = default
    )
    {
        var existing = await repository
            .SearchFor(Builders<MessageOutboxRecord>.Filter.Eq(x => x.MessageId, messageId))
            .FirstOrDefaultAsync(cancellationToken);

        if (existing == null)
        {
            return;
        }

        var now = DateTime.Now;
        var attempts = existing.ConsumeAttempts + 1;
        var backoffMinutes = (int)Math.Min(MaxBackoffMinutes, Math.Pow(2, attempts));
        var nextRetryAt = now.AddMinutes(backoffMinutes);

        var update = Builders<MessageOutboxRecord>
            .Update.Set(x => x.Status, OutboxMessageStatus.ConsumeFailed)
            .Set(x => x.ConsumeAttempts, attempts)
            .Set(x => x.LastConsumeAttemptAt, now)
            .Set(x => x.NextConsumeRetryAt, nextRetryAt)
            .Set(x => x.LastConsumeError, error)
            .Set(x => x.LastUpdateTime, now);

        await repository.UpdateAsync(x => x.MessageId == messageId, update, cancellationToken);
    }

    public async Task<IReadOnlyList<MessageOutboxEntry>> GetPendingPublishRetryAsync(
        int batchSize,
        CancellationToken cancellationToken = default
    )
    {
        var now = DateTime.Now;
        var filter = Builders<MessageOutboxRecord>.Filter.And(
            Builders<MessageOutboxRecord>.Filter.Eq(
                x => x.Status,
                OutboxMessageStatus.PublishFailed
            ),
            Builders<MessageOutboxRecord>.Filter.Lte(x => x.NextPublishRetryAt, now)
        );
        var records = await repository
            .SearchFor(filter)
            .Limit(batchSize)
            .ToListAsync(cancellationToken);
        return records
            .Select(x => new MessageOutboxEntry(
                x.MessageId,
                x.Exchange,
                x.RoutingKey,
                x.Payload,
                x.PublishAttempts,
                x.ConsumeAttempts
            ))
            .ToList();
    }

    public async Task<IReadOnlyList<MessageOutboxEntry>> GetPendingConsumeRetryAsync(
        int batchSize,
        CancellationToken cancellationToken = default
    )
    {
        var now = DateTime.Now;
        var filter = Builders<MessageOutboxRecord>.Filter.And(
            Builders<MessageOutboxRecord>.Filter.Eq(
                x => x.Status,
                OutboxMessageStatus.ConsumeFailed
            ),
            Builders<MessageOutboxRecord>.Filter.Lte(x => x.NextConsumeRetryAt, now)
        );
        var records = await repository
            .SearchFor(filter)
            .Limit(batchSize)
            .ToListAsync(cancellationToken);
        return records
            .Select(x => new MessageOutboxEntry(
                x.MessageId,
                x.Exchange,
                x.RoutingKey,
                x.Payload,
                x.PublishAttempts,
                x.ConsumeAttempts
            ))
            .ToList();
    }

    public async Task<IPagedList<MessageOutboxResponse>> GetPagedListAsync(
        MessageOutboxRequest request,
        CancellationToken cancellationToken = default
    )
    {
        var filter = BuildSearchFilter(request);
        var sort = Builders<MessageOutboxRecord>.Sort.Descending(x => x.CreateTime);
        return await repository
            .SearchFor(filter)
            .Sort(sort)
            .ToPagedListAsync<MessageOutboxRecord, MessageOutboxResponse>(
                request.PageIndex,
                request.PageSize,
                cancellationToken
            );
    }

    public async Task DeleteAsync(string id, CancellationToken cancellationToken = default)
    {
        await repository.TryDeleteAsync(id, cancellationToken);
        await RemoveCacheAsync(nameof(GetFilterOptionsAsync), cancellationToken: cancellationToken);
    }

    public async Task<MessageOutboxFilterOptionsResponse> GetFilterOptionsAsync(
        CancellationToken cancellationToken = default
    )
    {
        return await GetOrSetCacheAsync<MessageOutboxFilterOptionsResponse>(
            nameof(GetFilterOptionsAsync),
            async (_, ct) =>
            {
                var messageTypes = await GetDistinctValuesAsync(
                    nameof(MessageOutboxRecord.MessageType),
                    ct
                );
                var exchanges = await GetDistinctValuesAsync(
                    nameof(MessageOutboxRecord.Exchange),
                    ct
                );
                var routingKeys = await GetDistinctValuesAsync(
                    nameof(MessageOutboxRecord.RoutingKey),
                    ct
                );
                var sources = await GetDistinctValuesAsync(nameof(MessageOutboxRecord.Source), ct);

                return new MessageOutboxFilterOptionsResponse
                {
                    MessageTypes = messageTypes,
                    Exchanges = exchanges,
                    RoutingKeys = routingKeys,
                    Sources = sources,
                };
            },
            cancellationToken: cancellationToken
        );
    }

    private async Task<List<string>> GetDistinctValuesAsync(
        FieldDefinition<MessageOutboxRecord, string> field,
        CancellationToken cancellationToken = default
    )
    {
        using var cursor = await repository.Collection.DistinctAsync(
            field,
            Builders<MessageOutboxRecord>.Filter.Empty,
            cancellationToken: cancellationToken
        );
        var values = await cursor.ToListAsync(cancellationToken);
        return values
            .Where(x => !string.IsNullOrWhiteSpace(x))
            .Distinct(StringComparer.Ordinal)
            .Order(StringComparer.Ordinal)
            .ToList();
    }

    private static FilterDefinition<MessageOutboxRecord> BuildSearchFilter(
        MessageOutboxRequest request
    )
    {
        var builder = Builders<MessageOutboxRecord>.Filter;
        var filters = new List<FilterDefinition<MessageOutboxRecord>>();

        if (!string.IsNullOrWhiteSpace(request.Keyword))
        {
            var keyword = new BsonRegularExpression(request.Keyword, "i");
            filters.Add(
                builder.Or(
                    builder.Regex(x => x.MessageId, keyword),
                    builder.Regex(x => x.MessageType, keyword),
                    builder.Regex(x => x.Exchange, keyword),
                    builder.Regex(x => x.RoutingKey, keyword),
                    builder.Regex(x => x.Source, keyword),
                    builder.Regex(x => x.LastPublishError, keyword),
                    builder.Regex(x => x.LastConsumeError, keyword)
                )
            );
        }

        if (request.Status.HasValue)
        {
            filters.Add(builder.Eq(x => x.Status, request.Status.Value));
        }

        if (!string.IsNullOrWhiteSpace(request.MessageType))
        {
            filters.Add(builder.Eq(x => x.MessageType, request.MessageType));
        }

        if (!string.IsNullOrWhiteSpace(request.Exchange))
        {
            filters.Add(builder.Eq(x => x.Exchange, request.Exchange));
        }

        if (!string.IsNullOrWhiteSpace(request.RoutingKey))
        {
            filters.Add(builder.Eq(x => x.RoutingKey, request.RoutingKey));
        }

        if (!string.IsNullOrWhiteSpace(request.Source))
        {
            filters.Add(builder.Eq(x => x.Source, request.Source));
        }

        if (request.StartTime.HasValue)
        {
            filters.Add(builder.Gte(x => x.CreateTime, request.StartTime.Value));
        }

        if (request.EndTime.HasValue)
        {
            filters.Add(builder.Lte(x => x.CreateTime, request.EndTime.Value));
        }

        return filters.Count > 0 ? builder.And(filters) : builder.Empty;
    }
}
⚠⚠    以下内容为AI分析的结果,请根据实际情况进行判断。

这是一个基于MongoDB的消息出站箱(Message Outbox)存储服务实现。这个类实现了Outbox模式,用于确保分布式系统中消息的可靠发送和消费。让我详细解释一下:

类的职责

MongoMessageOutboxStore 是一个密封类,继承自 AbstractCacheService 并实现 IMongoMessageOutboxStore 接口,主要用于:

  • 管理待发送和待重试的消息
  • 跟踪消息的生命周期状态
  • 提供重试机制和错误处理
  • 支持缓存和分页查询

主要功能模块

1. 消息创建

public async Task CreateAsync(...)
  • 创建新的出站消息记录
  • 初始状态为 Pending
  • 清除相关缓存

2. 状态管理

  • MarkSentAsync: 标记消息为已发送
  • MarkConsumedAsync: 标记消息为已消费
  • MarkPublishFailedAsync: 标记发布失败,实现指数退避重试
  • MarkConsumeFailedAsync: 标记消费失败,实现指数退避重试

3. 重试机制

var backoffMinutes = (int)Math.Min(MaxBackoffMinutes, Math.Pow(2, attempts));
  • 使用指数退避算法计算重试时间
  • 最大退避时间限制为60分钟
  • 根据尝试次数动态调整重试间隔

4. 批量查询

  • GetPendingPublishRetryAsync: 获取需要重新发布的消息
  • GetPendingConsumeRetryAsync: 获取需要重新消费的消息
  • 支持批量大小限制,避免一次加载过多数据

5. 数据查询和过滤

  • GetPagedListAsync: 分页查询消息记录
  • GetFilterOptionsAsync: 获取过滤选项,支持缓存
  • BuildSearchFilter: 构建复杂的搜索条件,支持关键词模糊搜索

6. 缓存机制

protected override TimeSpan CacheDefaultExpiration => TimeSpan.FromHours(12);
  • 继承自 AbstractCacheService,提供缓存功能
  • 默认缓存12小时
  • 在数据变更时主动清除相关缓存

设计模式和特点

1. Outbox模式

这是一个典型的Outbox模式实现,确保:

  • 业务操作和消息发送的原子性
  • 消息最终一致性
  • 失败重试机制

2. 指数退避重试

Math.Pow(2, attempts)  // 2^attempts

避免在系统故障时造成雪崩效应。

3. 状态机管理

消息状态流转:

  • Pending → Sent → Consumed (正常流程)
  • Pending/Failed → PublishFailed → 重试
  • Sent/Failed → ConsumeFailed → 重试

4. 缓存优化

  • 过滤选项缓存,减少数据库查询
  • 数据变更时主动清除缓存保证一致性

技术栈

  • MongoDB: 作为数据存储
  • FusionCache: 提供缓存功能
  • 异步编程: 全面使用async/await
  • CancellationToken: 支持操作取消

这个实现非常适合微服务架构中需要保证消息可靠性的场景,通过Outbox模式确保了数据一致性,同时提供了完善的重试机制和监控能力。

评论加载中...