cap集成sqlsugar事务一致性问题 返回

SqlSugar 处理完成
5 381

我参照着官方文档示例实现:https://www.donet5.com/Ask/9/23459


/// <summary>
/// 重写Cap事务接口,集成SqlSugarClient的事务对象
/// </summary>
public class CapSqlSugarTransaction : CapTransactionBase
{
    public CapSqlSugarTransaction(IDispatcher dispatcher, ISqlSugarClient ado) : base(dispatcher)
    {
        Ado = ado;
        DbTransaction = ado;
    }

    public ISqlSugarClient Ado { get; set; }

    public override void Commit()
    {
        Ado.AsTenant().CommitTran();
        Flush();
    }

    public override async Task CommitAsync(CancellationToken cancellationToken = default)
    {
        await Ado.AsTenant().CommitTranAsync();
        Flush();
    }

    public override void Dispose()
    {
        Ado.AsTenant().Close();
    }

    public override void Rollback()
    {
        Ado.AsTenant().RollbackTran();
    }

    public override async Task RollbackAsync(CancellationToken cancellationToken = default)
    {
        await Ado.AsTenant().RollbackTranAsync();
    }
}
/// <summary>
/// CAP 事务扩展类
/// </summary>
public static class CapSqlSugarTransactionExtensions
{
    

    /// <summary>  
    /// 开启一个异步步的 CAP 事务。  
    /// </summary>  
    /// <param name="sugarClient">ISqlSugarClient 实例,用于操作数据库。</param>  
    /// <param name="publisher">ICapPublisher 实例,用于发布 CAP 消息。</param>  
    /// <param name="autoCommit">指示事务是否自动提交,默认为 false。</param>  
    /// <returns>返回一个 ICapTransaction 实例,表示 CAP 事务。</returns>  
    public static async Task<ICapTransaction> BeginCapTransactionAsync(this ISqlSugarClient sugarClient, ICapPublisher publisher, bool autoCommit = false)
    {
        var dispatcher = publisher.ServiceProvider.GetRequiredService<IDispatcher>();
        await sugarClient.AsTenant().BeginTranAsync(IsolationLevel.ReadCommitted);

        var transaction = new CapSqlSugarTransaction(dispatcher, sugarClient)
        {
            AutoCommit = autoCommit
        };
        return publisher.Transaction = transaction;
    }
    /// <summary>  
    /// 异步在 CAP 事务中执行指定的操作。  
    /// </summary>  
    /// <typeparam name="T">操作返回的结果类型。</typeparam>  
    /// <param name="db">ISqlSugarClient 实例,用于操作数据库。</param>  
    /// <param name="capPublisher">ICapPublisher 实例,用于发布 CAP 消息。</param>  
    /// <param name="action">要在事务中执行的异步操作。</param>  
    /// <param name="autoCommit">指示事务是否自动提交,默认为 false。</param>  
    /// <returns>返回操作的结果。</returns>  
    public static async Task<T> ExecuteInTransactionAsync<T>(this ISqlSugarClient db, ICapPublisher capPublisher, Func<Task<T>> action, bool autoCommit = false)
    {
        using (var trans = await db.BeginCapTransactionAsync(capPublisher, autoCommit))
        {
            try
            {
                var ret = await action();
                if (!autoCommit)
                {
                    await trans.CommitAsync();
                }
                return ret;
            }
            catch
            {
                await trans.RollbackAsync();
                throw;
            }
        }
    }

    
}
//cap消息服务接口
public class BaseRabbitMQNew : ITransient
{


    public ICapPublisher CapPublisher
    {
        get
        {
            return App.GetService<ICapPublisher>();
        }
    }
    /// <summary>
    /// 发送MQ消息
    /// </summary>
    /// <param name="routeKey">非必填参数,与MQAttribute特性,选择指定其一即可</param>
    public void Publish(object data, string routeKey = "", bool immediate = false)
    {
        if (CapPublisher == null)
        {
            return;
        }

        if (string.IsNullOrWhiteSpace(routeKey))
        {
            var attr = data.GetType().GetCustomAttribute<MQAttribute>();
            if (attr != null)
            {
                routeKey = attr.RouteKey;
            }
        }

        if (string.IsNullOrWhiteSpace(routeKey))
        {
            return;
        }




        if (!immediate)
        {
            CapPublisher.PublishDelay<object>(TimeSpan.FromSeconds(5), routeKey, data);
            return;
        }

        CapPublisher.Publish<object>(routeKey, data);
    }

    /// <summary>
    /// 发送MQ消息
    /// </summary>
    /// <param name="routeKey">非必填参数,与MQAttribute特性,选择指定其一即可</param>
    public async Task PublishAsync(object data, string routeKey = "", bool immediate = false)
    {
        if (CapPublisher == null)
        {
            return;
        }

        if (string.IsNullOrWhiteSpace(routeKey))
        {
            var attr = data.GetType().GetCustomAttribute<MQAttribute>();
            if (attr != null)
            {
                routeKey = attr.RouteKey;
            }
        }

        if (string.IsNullOrWhiteSpace(routeKey))
        {
            return;
        }


        if (!immediate)
        {
            await CapPublisher.PublishDelayAsync<object>(TimeSpan.FromSeconds(5), routeKey, data);
            return;
        }

        await CapPublisher.PublishAsync<object>(routeKey, data);
    }

}


--------------------------------------------------------------------消息对象特性定义----------------------------------------------------------------------------------------


//消息对象特性定义
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public class MQAttribute : Attribute
{
    /// <summary>
    /// 消息队列Key
    /// </summary>
    public string RouteKey { get; set; }

    public MQAttribute(string routeKey)
    {
        RouteKey = routeKey;
    }
}
-----------------------------------------------------------------------数据库服务基类-------------------------------------------------------------------------------------
//数据库服务基类
public class BaseService : ITransient
{
    private ISqlSugarClient db;

    /// <summary>
    /// 系统数据库
    /// </summary>
    protected ISqlSugarClient _db
    {
        get
        {
            if (App.HttpContext == null)
            {
                db = AppContextHolder.GetData("ThreadContextDB") as ISqlSugarClient;
            }

            if (db == null)
            {
                db = App.GetService<SqlSugarScope>();
            }
            return db;
        }
    }
     /// <summary>
 /// WMS数据库
 /// </summary>
 protected ISqlSugarClient _db_wms
 {
     get
     {
         if (_db is SqlSugarScope)
         {
             return _db.AsTenant().GetConnectionScope(ConstDatabase.WMS);
         }

         return _db.AsTenant().GetConnection(ConstDatabase.WMS);
     }
 }
    
}
------------------------------------------------------------------------------------------------------------------------------------------------------------


//测试代码
public class MqTestService : BaseService
{
    private readonly BaseRabbitMQNew _rabbitMQ;


    public MqTestService(BaseRabbitMQNew rabbitMQ)
    {
       //注入cap消息服务
        _rabbitMQ = rabbitMQ;
    }
    [MQ("wms.test.customize.mq")]
    public class TestCustomizeMqDto
    {
        public string code { get; set; }
        public DateTime time { get; set; }
    }
    /// <summary>
    /// 自定义事务测试
    /// </summary>
    /// <returns></returns>
    public async Task<(bool, string)> CustomizeTestMqAsync()
    {

        TestCustomizeMqDto dto = new TestCustomizeMqDto()
        {

            code = "9999",
            time = DateTime.Now
        };
        return await _db.ExecuteInTransactionAsync(_rabbitMQ.CapPublisher, async ()
              =>
        {
            await _rabbitMQ.PublishAsync(dto);
            var updateResult = await _db_wms.Updateable<WmsPrintLog>().SetColumns(item => new WmsPrintLog() { CreateTime = SqlFunc.GetDate() }, true).Where(item => item.Id == "2023100815091134013b25bf5a81b1d6").ExecuteCommandAsync();
            if (updateResult > 0)
            {
                《!!!!!这里抛异常之后,数据库修改回滚成功了,但是消息没有回滚,发出去了》
                throw Oops.Oh(ConstMessage.UpdateFail.FormatArgs($"打印日志"));
            }
            return (true, ConstMessage.Success);
        }, false);


    }
}




问题:

 自定义事务测试接口中,抛异常之后,数据库修改回滚成功了,但是消息没有回滚,发出去了




热忱回答5

  • fate sta fate sta VIP0
    2025/8/12
     publisher.Transaction.Value


     主要是这个东西传值。

    0 回复
  • fate sta fate sta VIP0
    2025/8/12

    原文中的AsyncLocal指的是这个。你看看设计是不是传值会有问题

    0 回复
  •     VIP0
    2025/8/14

    弄好了吗楼主,我也是遇到这个问题,数据回滚了cap发出去了,没有回滚。


    0 回复
  • fate sta fate sta VIP0
    2025/8/15

    image.png

    @ 

    0 回复
  • fate sta fate sta VIP0
    2025/8/15

    他tran赋值错了。已解决

    0 回复