cap集成sqlsugar事务一致性问题 返回
SqlSugar
处理完成
5
381
悬赏:0 飞吻
我参照着官方文档示例实现:https://www.donet5.com/Ask/9/23459
/// <summary>
/// 重写Cap事务接口,集成SqlSugarClient的事务对象
/// </summary>
public class CapSqlSugarTransaction : CapTransactionBase
{
public CapSqlSugarTransaction(IDispatcher dispatcher, ISqlSugarClient ado) : base(dispatcher)
{
Ado = ado;
DbTransaction = ado;
}
public ISqlSugarClient Ado { get; set; }
public override void Commit()
{
Ado.AsTenant().CommitTran();
Flush();
}
public override async Task CommitAsync(CancellationToken cancellationToken = default)
{
await Ado.AsTenant().CommitTranAsync();
Flush();
}
public override void Dispose()
{
Ado.AsTenant().Close();
}
public override void Rollback()
{
Ado.AsTenant().RollbackTran();
}
public override async Task RollbackAsync(CancellationToken cancellationToken = default)
{
await Ado.AsTenant().RollbackTranAsync();
}
}/// <summary>
/// CAP 事务扩展类
/// </summary>
public static class CapSqlSugarTransactionExtensions
{
/// <summary>
/// 开启一个异步步的 CAP 事务。
/// </summary>
/// <param name="sugarClient">ISqlSugarClient 实例,用于操作数据库。</param>
/// <param name="publisher">ICapPublisher 实例,用于发布 CAP 消息。</param>
/// <param name="autoCommit">指示事务是否自动提交,默认为 false。</param>
/// <returns>返回一个 ICapTransaction 实例,表示 CAP 事务。</returns>
public static async Task<ICapTransaction> BeginCapTransactionAsync(this ISqlSugarClient sugarClient, ICapPublisher publisher, bool autoCommit = false)
{
var dispatcher = publisher.ServiceProvider.GetRequiredService<IDispatcher>();
await sugarClient.AsTenant().BeginTranAsync(IsolationLevel.ReadCommitted);
var transaction = new CapSqlSugarTransaction(dispatcher, sugarClient)
{
AutoCommit = autoCommit
};
return publisher.Transaction = transaction;
}
/// <summary>
/// 异步在 CAP 事务中执行指定的操作。
/// </summary>
/// <typeparam name="T">操作返回的结果类型。</typeparam>
/// <param name="db">ISqlSugarClient 实例,用于操作数据库。</param>
/// <param name="capPublisher">ICapPublisher 实例,用于发布 CAP 消息。</param>
/// <param name="action">要在事务中执行的异步操作。</param>
/// <param name="autoCommit">指示事务是否自动提交,默认为 false。</param>
/// <returns>返回操作的结果。</returns>
public static async Task<T> ExecuteInTransactionAsync<T>(this ISqlSugarClient db, ICapPublisher capPublisher, Func<Task<T>> action, bool autoCommit = false)
{
using (var trans = await db.BeginCapTransactionAsync(capPublisher, autoCommit))
{
try
{
var ret = await action();
if (!autoCommit)
{
await trans.CommitAsync();
}
return ret;
}
catch
{
await trans.RollbackAsync();
throw;
}
}
}
}//cap消息服务接口
public class BaseRabbitMQNew : ITransient
{
public ICapPublisher CapPublisher
{
get
{
return App.GetService<ICapPublisher>();
}
}
/// <summary>
/// 发送MQ消息
/// </summary>
/// <param name="routeKey">非必填参数,与MQAttribute特性,选择指定其一即可</param>
public void Publish(object data, string routeKey = "", bool immediate = false)
{
if (CapPublisher == null)
{
return;
}
if (string.IsNullOrWhiteSpace(routeKey))
{
var attr = data.GetType().GetCustomAttribute<MQAttribute>();
if (attr != null)
{
routeKey = attr.RouteKey;
}
}
if (string.IsNullOrWhiteSpace(routeKey))
{
return;
}
if (!immediate)
{
CapPublisher.PublishDelay<object>(TimeSpan.FromSeconds(5), routeKey, data);
return;
}
CapPublisher.Publish<object>(routeKey, data);
}
/// <summary>
/// 发送MQ消息
/// </summary>
/// <param name="routeKey">非必填参数,与MQAttribute特性,选择指定其一即可</param>
public async Task PublishAsync(object data, string routeKey = "", bool immediate = false)
{
if (CapPublisher == null)
{
return;
}
if (string.IsNullOrWhiteSpace(routeKey))
{
var attr = data.GetType().GetCustomAttribute<MQAttribute>();
if (attr != null)
{
routeKey = attr.RouteKey;
}
}
if (string.IsNullOrWhiteSpace(routeKey))
{
return;
}
if (!immediate)
{
await CapPublisher.PublishDelayAsync<object>(TimeSpan.FromSeconds(5), routeKey, data);
return;
}
await CapPublisher.PublishAsync<object>(routeKey, data);
}
}
--------------------------------------------------------------------消息对象特性定义----------------------------------------------------------------------------------------
//消息对象特性定义
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public class MQAttribute : Attribute
{
/// <summary>
/// 消息队列Key
/// </summary>
public string RouteKey { get; set; }
public MQAttribute(string routeKey)
{
RouteKey = routeKey;
}
}
-----------------------------------------------------------------------数据库服务基类-------------------------------------------------------------------------------------
//数据库服务基类
public class BaseService : ITransient
{
private ISqlSugarClient db;
/// <summary>
/// 系统数据库
/// </summary>
protected ISqlSugarClient _db
{
get
{
if (App.HttpContext == null)
{
db = AppContextHolder.GetData("ThreadContextDB") as ISqlSugarClient;
}
if (db == null)
{
db = App.GetService<SqlSugarScope>();
}
return db;
}
}
/// <summary>
/// WMS数据库
/// </summary>
protected ISqlSugarClient _db_wms
{
get
{
if (_db is SqlSugarScope)
{
return _db.AsTenant().GetConnectionScope(ConstDatabase.WMS);
}
return _db.AsTenant().GetConnection(ConstDatabase.WMS);
}
}
}
------------------------------------------------------------------------------------------------------------------------------------------------------------
//测试代码
public class MqTestService : BaseService
{
private readonly BaseRabbitMQNew _rabbitMQ;
public MqTestService(BaseRabbitMQNew rabbitMQ)
{
//注入cap消息服务
_rabbitMQ = rabbitMQ;
}
[MQ("wms.test.customize.mq")]
public class TestCustomizeMqDto
{
public string code { get; set; }
public DateTime time { get; set; }
}
/// <summary>
/// 自定义事务测试
/// </summary>
/// <returns></returns>
public async Task<(bool, string)> CustomizeTestMqAsync()
{
TestCustomizeMqDto dto = new TestCustomizeMqDto()
{
code = "9999",
time = DateTime.Now
};
return await _db.ExecuteInTransactionAsync(_rabbitMQ.CapPublisher, async ()
=>
{
await _rabbitMQ.PublishAsync(dto);
var updateResult = await _db_wms.Updateable<WmsPrintLog>().SetColumns(item => new WmsPrintLog() { CreateTime = SqlFunc.GetDate() }, true).Where(item => item.Id == "2023100815091134013b25bf5a81b1d6").ExecuteCommandAsync();
if (updateResult > 0)
{
《!!!!!这里抛异常之后,数据库修改回滚成功了,但是消息没有回滚,发出去了》
throw Oops.Oh(ConstMessage.UpdateFail.FormatArgs($"打印日志"));
}
return (true, ConstMessage.Success);
}, false);
}
}问题:
自定义事务测试接口中,抛异常之后,数据库修改回滚成功了,但是消息没有回滚,发出去了
热忱回答(5)
-
fate sta VIP0
2025/8/12publisher.Transaction.Value
主要是这个东西传值。
0 回复 -
fate sta VIP0
2025/8/12原文中的AsyncLocal指的是这个。你看看设计是不是传值会有问题
0 回复 -
VIP0
2025/8/14弄好了吗楼主,我也是遇到这个问题,数据回滚了cap发出去了,没有回滚。
0 回复 -
fate sta VIP0
2025/8/15
@ :
0 回复 -
fate sta VIP0
2025/8/15他tran赋值错了。已解决
0 回复