Description
@yang-xiaodong
你好,我们项目将CAP投入生产环境已经两三年时间了。但从开始到现在一直有个问题无法100%复现,困扰到现在。
问题的表象是每过一两个月的时间,订阅者就收不到延迟消息了,同步消息是能还处理的。
我简述下业务场景:
订阅者收到消息后首先做逻辑判断,满足条件的更新业务数据,不满足条件的发布个10分钟的延迟消息还是当前订阅者处理,订阅者继续上述判断直至逻辑条件满足,往往循环几十次。
正常情况下订阅者每处理一个回合都将在cap.published表留下一条StatusName=Succeeded的数据。
每当问题发生时,published表中前一条消息的StatusName都是Queued,而非Succeeded。
因不满足条件而发出的第二条延迟消息,也已经进入了published表,状态也是Queued。
我很确信前一条消息已经被处理过了,因为第二条消息就是处理前一条消息后发布出来的,但前一条消息StatusName应该是Successed,但实现确是Queued。另外还有一个佐证,当这条消息正常消费后会更新ExpiresAt为当前时间+24小时,我的ExpiresAt也确实是+24小时的。(怀疑StatusName是下文ChangePublishStateToDelayedAsync误更新的)
当我手
5D22
删除published表的前一条数据并重启程序后,系统恢复正常。如果不删除该条消息,不管如何重启都解决不了问题。
我知道因为问题无法100%复现出来,很难排查。我想求助作者是否有其他人反馈过类似问题,或者给出排查问题的方向,感谢!
我翻了CAP8.2.0源码,DotNetCore.CAP.MySql.MySqlDataStorage
public async Task ChangePublishStateToDelayedAsync(string[] ids)
{
var sql =
$"UPDATE {_pubName}
SET StatusName
='{StatusName.Delayed}' WHERE Id
IN ({string.Join(',', ids)});";
var connection = new MySqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false);
}
我认为UPDATE 语句加个过滤条件更为稳妥:AND StatusName
NOT IN ('Succeeded','Failed')
我怀疑上述问题数据是它产生的,请指正。
但即便有这条异常数据,消息也不应该会hang在那儿不处理了。继续排查
DotNetCore.CAP.Processor.Dispatcher
public async ValueTask EnqueueToScheduler(MediumMessage message, DateTime publishTime, object? transaction = null)
{
message.ExpiresAt = publishTime;
var timeSpan = publishTime - DateTime.Now;
if (timeSpan <= TimeSpan.FromMinutes(1)) //1min
{
await _storage.ChangePublishStateAsync(message, StatusName.Queued, transaction);
_schedulerQueue.Enqueue(message, publishTime.Ticks);
if (publishTime.Ticks < _nextSendTime)
{
_delayCts.Cancel();
}
}
else
{
await _storage.ChangePublishStateAsync(message, StatusName.Delayed, transaction);
}
}
if (publishTime.Ticks < _nextSendTime)
{
_delayCts.Cancel();
}
这里的逻辑处理没看懂,为什么会有这个判断?如果执行了_delayCts.Cancel(),会将 _schedulerQueue.Enqueue(message, publishTime.Ticks)的操作回滚吗?如果回滚这里是不是导致我问题发生的原因呢?请指教。
我的程序版本如下:
abp 7.0.0
cap 8.2.0
mysql 阿里RDS云数据库 8.0.22
rabbitmq 3.10.6