8000 CAP8.2.0延迟消息无法处理问题求助 · Issue #1661 · dotnetcore/CAP · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
CAP8.2.0延迟消息无法处理问题求助 #1661
Closed
@haoyk

Description

@haoyk

@yang-xiaodong
你好,我们项目将CAP投入生产环境已经两三年时间了。但从开始到现在一直有个问题无法100%复现,困扰到现在。

问题的表象是每过一两个月的时间,订阅者就收不到延迟消息了,同步消息是能还处理的。

我简述下业务场景:
订阅者收到消息后首先做逻辑判断,满足条件的更新业务数据,不满足条件的发布个10分钟的延迟消息还是当前订阅者处理,订阅者继续上述判断直至逻辑条件满足,往往循环几十次。

正常情况下订阅者每处理一个回合都将在cap.published表留下一条StatusName=Succeeded的数据。
每当问题发生时,published表中前一条消息的StatusName都是Queued,而非Succeeded。
因不满足条件而发出的第二条延迟消息,也已经进入了published表,状态也是Queued。

我很确信前一条消息已经被处理过了,因为第二条消息就是处理前一条消息后发布出来的,但前一条消息StatusName应该是Successed,但实现确是Queued。另外还有一个佐证,当这条消息正常消费后会更新ExpiresAt为当前时间+24小时,我的ExpiresAt也确实是+24小时的。(怀疑StatusName是下文ChangePublishStateToDelayedAsync误更新的)
当我手 5D22 删除published表的前一条数据并重启程序后,系统恢复正常。如果不删除该条消息,不管如何重启都解决不了问题。

我知道因为问题无法100%复现出来,很难排查。我想求助作者是否有其他人反馈过类似问题,或者给出排查问题的方向,感谢!

我翻了CAP8.2.0源码,DotNetCore.CAP.MySql.MySqlDataStorage

public async Task ChangePublishStateToDelayedAsync(string[] ids)
{
var sql =
$"UPDATE {_pubName} SET StatusName='{StatusName.Delayed}' WHERE Id IN ({string.Join(',', ids)});";
var connection = new MySqlConnection(_options.Value.ConnectionString);
await using var _ = connection.ConfigureAwait(false);
await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false);
}

我认为UPDATE 语句加个过滤条件更为稳妥:AND StatusName NOT IN ('Succeeded','Failed')
我怀疑上述问题数据是它产生的,请指正。
但即便有这条异常数据,消息也不应该会hang在那儿不处理了。继续排查

DotNetCore.CAP.Processor.Dispatcher

public async ValueTask EnqueueToScheduler(MediumMessage message, DateTime publishTime, object? transaction = null)
{
    message.ExpiresAt = publishTime;

    var timeSpan = publishTime - DateTime.Now;

    if (timeSpan <= TimeSpan.FromMinutes(1)) //1min
    {
        await _storage.ChangePublishStateAsync(message, StatusName.Queued, transaction);

        _schedulerQueue.Enqueue(message, publishTime.Ticks);

        if (publishTime.Ticks < _nextSendTime)
        {
            _delayCts.Cancel();
        }
    }
    else
    {
        await _storage.ChangePublishStateAsync(message, StatusName.Delayed, transaction);
    }
}

if (publishTime.Ticks < _nextSendTime)
{
_delayCts.Cancel();
}
这里的逻辑处理没看懂,为什么会有这个判断?如果执行了_delayCts.Cancel(),会将 _schedulerQueue.Enqueue(message, publishTime.Ticks)的操作回滚吗?如果回滚这里是不是导致我问题发生的原因呢?请指教。

我的程序版本如下:
abp 7.0.0
cap 8.2.0
mysql 阿里RDS云数据库 8.0.22
rabbitmq 3.10.6

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0