using Dpz.Core.MessageQueue.Models;
using Dpz.Core.Public.ViewModel.Request;
using Dpz.Core.Public.ViewModel.Response;
namespace Dpz.Core.Service.RepositoryServiceImpl;
public sealed class MongoMessageOutboxStore(
IRepository<MessageOutboxRecord> repository,
ILogger<MongoMessageOutboxStore> logger,
IFusionCache fusionCache
) : AbstractCacheService(fusionCache), IMongoMessageOutboxStore
{
private const int MaxBackoffMinutes = 60;
protected override TimeSpan CacheDefaultExpiration => TimeSpan.FromHours(12);
public async Task CreateAsync(
string messageId,
string messageType,
string exchange,
string routingKey,
string payload,
string? source,
CancellationToken cancellationToken = default
)
{
var now = DateTime.Now;
var record = new MessageOutboxRecord
{
MessageId = messageId,
MessageType = messageType,
Exchange = exchange,
RoutingKey = routingKey,
Payload = payload,
Source = source,
Status = OutboxMessageStatus.Pending,
CreateTime = now,
LastUpdateTime = now,
};
await repository.InsertAsync(record, cancellationToken);
await RemoveCacheAsync(nameof(GetFilterOptionsAsync), cancellationToken: cancellationToken);
}
public async Task MarkSentAsync(string messageId, CancellationToken cancellationToken = default)
{
var now = DateTime.Now;
var update = Builders<MessageOutboxRecord>
.Update.Set(x => x.Status, OutboxMessageStatus.Sent)
.Set(x => x.SentAt, now)
.Set(x => x.LastUpdateTime, now);
await repository.UpdateAsync(x => x.MessageId == messageId, update, cancellationToken);
}
public async Task MarkPublishFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken = default
)
{
var existing = await repository
.SearchFor(Builders<MessageOutboxRecord>.Filter.Eq(x => x.MessageId, messageId))
.FirstOrDefaultAsync(cancellationToken);
if (existing == null)
{
logger.LogWarning("MarkPublishFailedAsync: 未找到 MessageId={MessageId}", messageId);
return;
}
var now = DateTime.Now;
var attempts = existing.PublishAttempts + 1;
var backoffMinutes = (int)Math.Min(MaxBackoffMinutes, Math.Pow(2, attempts));
var nextRetryAt = now.AddMinutes(backoffMinutes);
var update = Builders<MessageOutboxRecord>
.Update.Set(x => x.Status, OutboxMessageStatus.PublishFailed)
.Set(x => x.PublishAttempts, attempts)
.Set(x => x.LastPublishAttemptAt, now)
.Set(x => x.NextPublishRetryAt, nextRetryAt)
.Set(x => x.LastPublishError, error)
.Set(x => x.LastUpdateTime, now);
await repository.UpdateAsync(x => x.MessageId == messageId, update, cancellationToken);
}
public async Task MarkConsumedAsync(
string messageId,
CancellationToken cancellationToken = default
)
{
var now = DateTime.Now;
var update = Builders<MessageOutboxRecord>
.Update.Set(x => x.Status, OutboxMessageStatus.Consumed)
.Set(x => x.ConsumedAt, now)
.Set(x => x.LastUpdateTime, now);
await repository.UpdateAsync(x => x.MessageId == messageId, update, cancellationToken);
}
public async Task MarkConsumeFailedAsync(
string messageId,
string error,
CancellationToken cancellationToken = default
)
{
var existing = await repository
.SearchFor(Builders<MessageOutboxRecord>.Filter.Eq(x => x.MessageId, messageId))
.FirstOrDefaultAsync(cancellationToken);
if (existing == null)
{
return;
}
var now = DateTime.Now;
var attempts = existing.ConsumeAttempts + 1;
var backoffMinutes = (int)Math.Min(MaxBackoffMinutes, Math.Pow(2, attempts));
var nextRetryAt = now.AddMinutes(backoffMinutes);
var update = Builders<MessageOutboxRecord>
.Update.Set(x => x.Status, OutboxMessageStatus.ConsumeFailed)
.Set(x => x.ConsumeAttempts, attempts)
.Set(x => x.LastConsumeAttemptAt, now)
.Set(x => x.NextConsumeRetryAt, nextRetryAt)
.Set(x => x.LastConsumeError, error)
.Set(x => x.LastUpdateTime, now);
await repository.UpdateAsync(x => x.MessageId == messageId, update, cancellationToken);
}
public async Task<IReadOnlyList<MessageOutboxEntry>> GetPendingPublishRetryAsync(
int batchSize,
CancellationToken cancellationToken = default
)
{
var now = DateTime.Now;
var filter = Builders<MessageOutboxRecord>.Filter.And(
Builders<MessageOutboxRecord>.Filter.Eq(
x => x.Status,
OutboxMessageStatus.PublishFailed
),
Builders<MessageOutboxRecord>.Filter.Lte(x => x.NextPublishRetryAt, now)
);
var records = await repository
.SearchFor(filter)
.Limit(batchSize)
.ToListAsync(cancellationToken);
return records
.Select(x => new MessageOutboxEntry(
x.MessageId,
x.Exchange,
x.RoutingKey,
x.Payload,
x.PublishAttempts,
x.ConsumeAttempts
))
.ToList();
}
public async Task<IReadOnlyList<MessageOutboxEntry>> GetPendingConsumeRetryAsync(
int batchSize,
CancellationToken cancellationToken = default
)
{
var now = DateTime.Now;
var filter = Builders<MessageOutboxRecord>.Filter.And(
Builders<MessageOutboxRecord>.Filter.Eq(
x => x.Status,
OutboxMessageStatus.ConsumeFailed
),
Builders<MessageOutboxRecord>.Filter.Lte(x => x.NextConsumeRetryAt, now)
);
var records = await repository
.SearchFor(filter)
.Limit(batchSize)
.ToListAsync(cancellationToken);
return records
.Select(x => new MessageOutboxEntry(
x.MessageId,
x.Exchange,
x.RoutingKey,
x.Payload,
x.PublishAttempts,
x.ConsumeAttempts
))
.ToList();
}
public async Task<IPagedList<MessageOutboxResponse>> GetPagedListAsync(
MessageOutboxRequest request,
CancellationToken cancellationToken = default
)
{
var filter = BuildSearchFilter(request);
var sort = Builders<MessageOutboxRecord>.Sort.Descending(x => x.CreateTime);
return await repository
.SearchFor(filter)
.Sort(sort)
.ToPagedListAsync<MessageOutboxRecord, MessageOutboxResponse>(
request.PageIndex,
request.PageSize,
cancellationToken
);
}
public async Task DeleteAsync(string id, CancellationToken cancellationToken = default)
{
await repository.TryDeleteAsync(id, cancellationToken);
await RemoveCacheAsync(nameof(GetFilterOptionsAsync), cancellationToken: cancellationToken);
}
public async Task<MessageOutboxFilterOptionsResponse> GetFilterOptionsAsync(
CancellationToken cancellationToken = default
)
{
return await GetOrSetCacheAsync<MessageOutboxFilterOptionsResponse>(
nameof(GetFilterOptionsAsync),
async (_, ct) =>
{
var messageTypes = await GetDistinctValuesAsync(
nameof(MessageOutboxRecord.MessageType),
ct
);
var exchanges = await GetDistinctValuesAsync(
nameof(MessageOutboxRecord.Exchange),
ct
);
var routingKeys = await GetDistinctValuesAsync(
nameof(MessageOutboxRecord.RoutingKey),
ct
);
var sources = await GetDistinctValuesAsync(nameof(MessageOutboxRecord.Source), ct);
return new MessageOutboxFilterOptionsResponse
{
MessageTypes = messageTypes,
Exchanges = exchanges,
RoutingKeys = routingKeys,
Sources = sources,
};
},
cancellationToken: cancellationToken
);
}
private async Task<List<string>> GetDistinctValuesAsync(
FieldDefinition<MessageOutboxRecord, string> field,
CancellationToken cancellationToken = default
)
{
using var cursor = await repository.Collection.DistinctAsync(
field,
Builders<MessageOutboxRecord>.Filter.Empty,
cancellationToken: cancellationToken
);
var values = await cursor.ToListAsync(cancellationToken);
return values
.Where(x => !string.IsNullOrWhiteSpace(x))
.Distinct(StringComparer.Ordinal)
.Order(StringComparer.Ordinal)
.ToList();
}
private static FilterDefinition<MessageOutboxRecord> BuildSearchFilter(
MessageOutboxRequest request
)
{
var builder = Builders<MessageOutboxRecord>.Filter;
var filters = new List<FilterDefinition<MessageOutboxRecord>>();
if (!string.IsNullOrWhiteSpace(request.Keyword))
{
var keyword = new BsonRegularExpression(request.Keyword, "i");
filters.Add(
builder.Or(
builder.Regex(x => x.MessageId, keyword),
builder.Regex(x => x.MessageType, keyword),
builder.Regex(x => x.Exchange, keyword),
builder.Regex(x => x.RoutingKey, keyword),
builder.Regex(x => x.Source, keyword),
builder.Regex(x => x.LastPublishError, keyword),
builder.Regex(x => x.LastConsumeError, keyword)
)
);
}
if (request.Status.HasValue)
{
filters.Add(builder.Eq(x => x.Status, request.Status.Value));
}
if (!string.IsNullOrWhiteSpace(request.MessageType))
{
filters.Add(builder.Eq(x => x.MessageType, request.MessageType));
}
if (!string.IsNullOrWhiteSpace(request.Exchange))
{
filters.Add(builder.Eq(x => x.Exchange, request.Exchange));
}
if (!string.IsNullOrWhiteSpace(request.RoutingKey))
{
filters.Add(builder.Eq(x => x.RoutingKey, request.RoutingKey));
}
if (!string.IsNullOrWhiteSpace(request.Source))
{
filters.Add(builder.Eq(x => x.Source, request.Source));
}
if (request.StartTime.HasValue)
{
filters.Add(builder.Gte(x => x.CreateTime, request.StartTime.Value));
}
if (request.EndTime.HasValue)
{
filters.Add(builder.Lte(x => x.CreateTime, request.EndTime.Value));
}
return filters.Count > 0 ? builder.And(filters) : builder.Empty;
}
}
⚠⚠ 以下内容为AI分析的结果,请根据实际情况进行判断。
这是一个基于MongoDB的消息出站箱(Message Outbox)存储服务实现。这个类实现了Outbox模式,用于确保分布式系统中消息的可靠发送和消费。让我详细解释一下:
类的职责
MongoMessageOutboxStore 是一个密封类,继承自 AbstractCacheService 并实现 IMongoMessageOutboxStore 接口,主要用于:
- 管理待发送和待重试的消息
- 跟踪消息的生命周期状态
- 提供重试机制和错误处理
- 支持缓存和分页查询
主要功能模块
1. 消息创建
public async Task CreateAsync(...)
- 创建新的出站消息记录
- 初始状态为
Pending - 清除相关缓存
2. 状态管理
- MarkSentAsync: 标记消息为已发送
- MarkConsumedAsync: 标记消息为已消费
- MarkPublishFailedAsync: 标记发布失败,实现指数退避重试
- MarkConsumeFailedAsync: 标记消费失败,实现指数退避重试
3. 重试机制
var backoffMinutes = (int)Math.Min(MaxBackoffMinutes, Math.Pow(2, attempts));
- 使用指数退避算法计算重试时间
- 最大退避时间限制为60分钟
- 根据尝试次数动态调整重试间隔
4. 批量查询
- GetPendingPublishRetryAsync: 获取需要重新发布的消息
- GetPendingConsumeRetryAsync: 获取需要重新消费的消息
- 支持批量大小限制,避免一次加载过多数据
5. 数据查询和过滤
- GetPagedListAsync: 分页查询消息记录
- GetFilterOptionsAsync: 获取过滤选项,支持缓存
- BuildSearchFilter: 构建复杂的搜索条件,支持关键词模糊搜索
6. 缓存机制
protected override TimeSpan CacheDefaultExpiration => TimeSpan.FromHours(12);
- 继承自
AbstractCacheService,提供缓存功能 - 默认缓存12小时
- 在数据变更时主动清除相关缓存
设计模式和特点
1. Outbox模式
这是一个典型的Outbox模式实现,确保:
- 业务操作和消息发送的原子性
- 消息最终一致性
- 失败重试机制
2. 指数退避重试
Math.Pow(2, attempts) // 2^attempts
避免在系统故障时造成雪崩效应。
3. 状态机管理
消息状态流转:
- Pending → Sent → Consumed (正常流程)
- Pending/Failed → PublishFailed → 重试
- Sent/Failed → ConsumeFailed → 重试
4. 缓存优化
- 过滤选项缓存,减少数据库查询
- 数据变更时主动清除缓存保证一致性
技术栈
- MongoDB: 作为数据存储
- FusionCache: 提供缓存功能
- 异步编程: 全面使用async/await
- CancellationToken: 支持操作取消
这个实现非常适合微服务架构中需要保证消息可靠性的场景,通过Outbox模式确保了数据一致性,同时提供了完善的重试机制和监控能力。
评论加载中...