用了 DbScoped.SugarScope 还是报 There is already an open DataRead 返回

SqlSugar
14 96
该叫什么 liftlei 发布于4天前
悬赏:5 飞吻

用的是仓库模式,主要部分截图如下,最后一个截图就是在线程里面执行的,然后就会报 There is already an open DataRead...,帮忙看看,谢谢!

image.png

image.png

image.png

image.png

热忱回答14

  • 局部代码看不出问是,提供完整用例,大概率异步用法问题,最上层方法 一定要是TASK类型 并且都有加AWAIT

    0 回复
  • @fate sta

    这是个多线程队列:

    using System;

    using System.Collections.Concurrent;

    using System.Collections.Generic;

    using System.Threading;

    using System.Threading.Tasks;


    namespace FastOrder.Core.Infrastructure.Extensions

    {

        public class TaskProcessQueue<T>

        {

            private readonly CancellationTokenSource _cts = new();

            private readonly CancellationToken _ct;

            private readonly BlockingCollection<T> _bc = new();

            private readonly List<Task> _tasks = new();

            private readonly int _threadCount = 1;

            private const int Processing = 1;

            private const int UnProcessing = 0;

            private int _isProcessing;


            public event Func<T, Task> ProcessEvent;

            public event Action<dynamic, Exception> ExceptionHandleEvent;


            public TaskProcessQueue()

            {

                _ct = _cts.Token;

            }

            public TaskProcessQueue(int threadCount) : this()

            {

                _threadCount = threadCount;

            }


            public void CompleteAdding()

            {

                _bc.CompleteAdding();

            }


            public void Stop()

            {

                _cts.Cancel();


                while (!_bc.IsCompleted)

                {

                    _bc.TryTake(out T _);

                }

            }


            /// <summary>

            /// 入队

            /// </summary>

            /// <param name="item"></param>

            public void Enqueue(T item)

            {

                if (item != null)

                {

                    _bc.Add(item);

                }


                Process();

            }


            private void Process()

            {

                if (!IsProcessing())

                {

                    AddThreads();

                    StartThreads();

                }

            }


            /// <summary>

            /// 判断是否队列有线程正在处理 

            /// </summary>

            /// <returns></returns>

            private bool IsProcessing()

            {

                return Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) != UnProcessing;

            }


            private void AddThreads()

            {

                for (int i = 0; i < _threadCount; i++)

                {

                    Task task = new(() =>

                    {

                        while (!_ct.IsCancellationRequested && !_bc.IsCompleted)

                        {

                            try

                            {

                                T item = _bc.Take();

                                ProcessEvent(item);

                            }

                            catch (Exception ex)

                            {

                                HandleException(ex);

                            }

                        }

                    });


                    _tasks.Add(task);

                }

            }


            private void StartThreads()

            {

                foreach (Task task in _tasks)

                {

                    task.Start();

                }

            }


            private void HandleException(Exception ex)

            {

                Interlocked.CompareExchange(ref ExceptionHandleEvent, null, null);

                ExceptionHandleEvent?.Invoke(this, ex);

            }

        }

    }


    然后用委托执行:

     public class BankOrderFromFtpJob : QuartzJobBase, IJob

        {

            private readonly TaskProcessQueue<PrepareOrder> _processQueue = new(20);

            private readonly ILogger<BankOrderFromFtpJob> _logger;

            private readonly ISfExpressService _expressService;

            private readonly string serviceUrl;



            public BankOrderFromFtpJob(

                DbAppsettings dbAppsettings,

                ISysTaskService qzTasksService,

                ISfExpressService expressService,

                ILogger<BankOrderFromFtpJob> logger) : base(logger, qzTasksService)

            {

                _processQueue.ProcessEvent += ProcessQueue_ProcessEvent;

                _logger = logger;

                _expressService = expressService;

                serviceUrl = dbAppsettings["BSP_URL"];

            }


            public async Task Execute(IJobExecutionContext context)

            {

                await ExecuteJob(context, async () => { return await Run(context); });

            }


            public async Task<string> Run(IJobExecutionContext context)

            {

            }


            private async Task Work(PrepareOrder parm)

            {

                try

                {

       //在这里报错

                    if (await DbScoped.SugarScope.Queryable<PrepareOrder>().AnyAsync(t => t.Mailno == parm.Mailno))

                    {

                       

                        return;

                    }


                    

                }

                catch (SqlSugar.SqlSugarException dbEx)

                {

                   

                }

                catch (Exception _ex)

                {

                    

                }

            }




            private void ProcessQueue_ProcessEvent(PrepareOrder value)

            {

                Work(value).Wait();

            }

        }



    0 回复
  • 这种代码直接new sqlsugarclinet吧  , 明显上层不是TASK,这种情况 sqlsugarscope是会有问题

    0 回复
  • 开线程就不要用异步,用异步就不要开线程

    0 回复
  • @fate sta:为什么这样就正常呢?

    public class ProcessQueueTestJob : QuartzJobBase, IJob

        {

            private readonly TaskProcessQueue<PrepareOrder> _processQueue = new(10);

            private readonly Repository<PrepareOrder> repository;


            public ProcessQueueTestJob(ISysTaskService qzTasksService,

                ILogger<ProcessQueueTestJob> logger) : base(logger, qzTasksService)

            {

                _processQueue.ProcessEvent += ProcessQueue_ProcessEvent;

                repository = new Repository<PrepareOrder>();

            }


            public async Task Execute(IJobExecutionContext context)

            {

                await ExecuteJob(context, async () => { return await Run(context); });

            }


            public async Task<string> Run(IJobExecutionContext context)

            {

                List<PrepareOrder> list = new();


                await Task.Run(() =>

                {

                    for (int i = 0; i <= 9999; i++)

                    {

                        list.Add(new()

                        {

                            CreateTime = DateTime.Now,

                            Orderid = Guid.NewGuid().ToString("N")

                        });

                    }

                });


                foreach (var item in list)

                {

                    _processQueue.Enqueue(item);

                }


                _processQueue.CompleteAdding();

                return "OK";

            }


            private async Task ProcessQueue_ProcessEvent(PrepareOrder item)

            {

                var result = repository.InsertReturnEntityAsync(item).Result;

                await Task.Delay(200);

                result.Mailno = "SF123456789011";

                await repository.UpdateAsync(result, t => new { t.Mailno });

            }

        }


    0 回复
  • @liftlei: SqlSugarScope原理 : AsyncLocal 认为不一个异步上下文,就会NEW 出一个SqlSugarClient ,

    当AsyncLocal 无非正确的new sqlsugarclient那么就证明你异步有用法错误

    0 回复
  • 如果是新手我建议 只要出现 Task(()=>{ }) 这种操作 就不要用异步方法了,直接全部同步方法

    0 回复
  • @fate sta:可以了。

    在 public async Task<string> Run(IJobExecutionContext context) 里要new一个来用,不能用注入的那个对象,不然就会报错。。

    但是我在线程里面用 DbScoped.SugarScope 也是不行,不是 SqlSugarScope原理 : AsyncLocal 认为不一个异步上下文,就会NEW 出一个SqlSugarClient 吗? 还是我想得太复杂了。。

    0 回复
  • @fate sta

    如果是新手我建议 只要出现 Task(()=>{ }) 这种操作 就不要用异步方法了,直接全部同步方法

    在线程里面我用了同步方法也不行

    0 回复
  • @liftlei:你的异步写法有问题会导致导步上下文混乱

    0 回复
  • 这样SqlSugarScope是识别不了的

    0 回复
  • 直接new sqlsugarclient吧

    0 回复
  • 你如果最上层方法不是TASK ,直接new SqlSugarClient,像这种哪是异步


    image.png

    0 回复
  • 不在回复了,SqlSugarScope 上线到现在无一例BUG (唯一要求就是异步用法正确), 只要你按标准写法来 ,有疑问就按模版方式提供完整的能跑的无业务的DEMO

    0 回复