三五秒执行一次大数据修改会异常报错 返回

SqlSugar 沟通中
11 227

1699926814543.png我用websoket接到的数据,三五秒执行一次大数据修改每次修改的条数大约在一万五千条左右,但是每次都是过了几个小时候会报错。

热忱回答11

  • 加上CopyNew().Fastest<xx> 试一下

    0 回复
  • 好的,我试试

    0 回复
  • 1699938469090.jpg@fate sta:还是报错

    0 回复
  • 那就需要提供完整可以重现的DEMO 

    https://www.donet5.com/Home/Doc?typeId=2366 这个是提问模版

    0 回复
  • namespace HX.Dispach.Emergency
    {
        [ApiDescriptionSettings(Tag = "Dispatch", Name = "WebSocket", Order = 200)]
        [Route("Dispatch/[controller]")]
        public class WebSocketService : IDynamicApiController, ITransient
        {
            private readonly ISqlSugarRepository<PersonBingrecordEntity> _repository;
            /// <summary>
            /// 代码生成数据转换.
            /// </summary>
            private readonly CodeGenDataConversion _codeGenDataConversion;
    
            /// <summary>
            /// 用户管理.
            /// </summary>
            private readonly IUserManager _userManager;
    
            /// <summary>
            /// 缓存管理器.
            /// </summary>
            private readonly ICacheManager _cacheManager;
    
            /// <summary>
            /// 客户端.
            /// </summary>
            private SqlSugarScope _sqlSugarClient;
    
            private readonly WebSocketHandler _webSocketHandler;
    
            /// <summary>
            /// 数据库管理.
            /// </summary>
            private readonly IDataBaseManager _dataBaseManager;
    
            /// <summary>
            /// 客户端.
            /// </summary>
    
            /// <summary>
            /// 模板方法.
            /// </summary>
            private readonly AbstractConsulDispatcher _abstractConsulDispatcher;
    
            /// <summary>
            /// 初始化一个<see cref="WebSocketService"/>类型的新实例.
            /// </summary>
            public WebSocketService(
                ISqlSugarRepository<PersonBingrecordEntity> repository,
                CodeGenDataConversion codeGenDataConversion,
                ICacheManager cacheManager,
                IDataBaseManager dataBaseManager,
                ISqlSugarClient context,
                IUserManager userManager)
            {
                _repository = repository;
                _codeGenDataConversion = codeGenDataConversion;
                _dataBaseManager = dataBaseManager;
                _userManager = userManager;
                _sqlSugarClient = (SqlSugarScope)context;
                _cacheManager = cacheManager;
            }
    
            public static WebSocket4Net.WebSocket webSocket4Net = null;
            public static string WebIpAddr = string.Empty;
    
            public static string lists = string.Empty;
            public static Dictionary<string, data> keys = new Dictionary<string, data>();
            public static int i = 0;
    
            public void GetDangerMonit()
            {
                List<Query> job = new List<Query>();
                var Jcsj = _repository.AsSugarClient().Queryable<DangerMonitEntity>().ToList();
                foreach (var item in Jcsj)
                {
                    Query query = new Query();
                    query.allTag = true;
                    query.id = item.Code;
                    job.Add(query);
                }
                WebIpAddr = string.Format("{0}{1}", "ws://xxxxxxx:31000/core/ws/data", string.Format("{0}{1}", "?token=" + RedisHelper.Get("htkj-Token") + "&query=", SerializeToJson(job)));
            }
    
            /// <summary>
            /// 自控websoket实时接收数据.
            /// </summary>
            /// <returns></returns>
            [HttpGet("WebsokectClient")]
            [AllowAnonymous]
            [IgnoreLog]
            public void WebsokectClient()
            {
                if (string.IsNullOrWhiteSpace(WebIpAddr))
                {
                    GetDangerMonit();
                }
                webSocket4Net = new WebSocket4Net.WebSocket(WebIpAddr);
                webSocket4Net.Opened += WebSocket4Net_Opened;
                webSocket4Net.MessageReceived += WebSocket4Net_MessageReceived;
                webSocket4Net.Open();
                Thread thread = new Thread(ClientSendMsgToServer);
                thread.IsBackground = true;
                thread.Start();
    
                Thread keepalive = new Thread(ClientKeepAlive);
                keepalive.IsBackground = true;
                keepalive.Start();
    
                while (true)
                {
                    Thread.Sleep(500000);
                }
            }
            private void ClientSendMsgToServer()
            {
                while (true)
                {
                    webSocket4Net.Send("cs");
                    Thread.Sleep(TimeSpan.FromSeconds(5));
                }
            }
            private void ClientKeepAlive()
            {
                while (true)
                {
    
                    Thread.Sleep(TimeSpan.FromSeconds(10));
                    if (webSocket4Net.State == WebSocket4Net.WebSocketState.Closed)
                    {
                        Console.WriteLine("服务器连接错误,正在尝试重新连接");
                        JNPF.Dispach.logHelper.writeLog(webSocket4Net.State.ToString(), "进入服务器连接错误", "服务器连接错误,正在尝试重新连接");
                        CeShi();
                        GetDangerMonit();
                        webSocket4Net = new WebSocket4Net.WebSocket(WebIpAddr);
                        try
                        {
                            webSocket4Net.Open();
                        }
                        catch (Exception ex)
                        {
                            JNPF.Dispach.logHelper.writeLog(webSocket4Net.State.ToString(), "进入服务器连接错的方法", "没问题");
                        }
                    }
                    /*webSocket4Net.EnableAutoSendPing;
                    webSocket4Net.Send("KeepAlive");
                    webSocket4Net.MessageReceived += ClientReceive;*/
                }
            }
    
            private void WebSocket4Net_MessageReceived(object sender, MessageReceivedEventArgs e)
            {
                 JNPF.Dispach.logHelper.writeLog(e.Message, "连接成功方法的内容", "没问题");
                 Msg msg = new Msg();
                 if (e.Message.Contains("获取当前用户ID失败"))
                 {
                     CeShi();
                     GetDangerMonit();
                     webSocket4Net = new WebSocket4Net.WebSocket(WebIpAddr);
                 }
                 if (!e.Message.Contains("}}") && string.IsNullOrWhiteSpace(lists))
                 {
                     lists = e.Message;
                 }
                 else if (!e.Message.Contains("}}") && !string.IsNullOrWhiteSpace(lists))
                 {
                     lists = lists + e.Message;
                 }
                 else if (e.Message.Contains("}}") && !string.IsNullOrWhiteSpace(lists))
                 {
                     lists = lists + e.Message;
                     msg = lists.ToObject<Msg>();
                     lists = string.Empty;
                     var list = msg.data;
                     if (!keys.Keys.Contains(msg.data.nodeId))
                     {
                         keys.Add(msg.data.nodeId, list);
                     }
                     else
                     {
                         keys[msg.data.nodeId] = list;
                     }
                     Update(RedisHelper.Get("Zxliebiao").ToObject<List<DangerMonitEntity>>(), keys);
                 }
                 else if (e.Message.Contains("}}") && string.IsNullOrWhiteSpace(lists))
                 {
                     i++;
                     msg = e.Message.ToObject<Msg>();
                     var list = msg.data;
                     if (!keys.Keys.Contains(msg.data.nodeId))
                     {
                         keys.Add(msg.data.nodeId, list);
                     }
                     else
                     {
                         keys[msg.data.nodeId] = list;
                     }
                     if (i == 10)
                     {
                         Update(RedisHelper.Get("Zxliebiao").ToObject<List<DangerMonitEntity>>(), keys);
                         i = 0;
                     }
                     lists = string.Empty;
                 }
                
            }
    
            private static void WebSocket4Net_Opened(object sender, EventArgs e)
            {
                Console.WriteLine("来自客户端,webSocket Client_Opened,客户端准备发送数据!");
                webSocket4Net.Send("来自客户端,准备发送数据!");
            }
    
            private static void CeShi()
            {
                var ss = HttpGeted(App.Configuration["ZkJk"] + "core/auth/token?appkey={key}&appsecret={appsecret}");
                JObject job = JObject.Parse(ss);
                RedisHelper.Set("htkj-Token", job["token"].ToString());
            }
            public static string HttpGeted(string url)
            {
                Encoding encoding = Encoding.UTF8;
                HttpWebRequest request = (HttpWebRequest)WebRequest.Create(url);
                request.Method = "GET";
                request.Accept = "text/html, application/xhtml+xml, */*";
                request.ContentType = "application/json";
                //request.Headers.Add("Authorization", RedisHelper.Get("htkj-Token"));
                HttpWebResponse response = (HttpWebResponse)request.GetResponse();
                using (StreamReader reader = new StreamReader(response.GetResponseStream(), Encoding.UTF8))
                {
                    return reader.ReadToEnd();
                }
            }
            private void Update(List<DangerMonitEntity> state, Dictionary<string, data> keys)
            {
                List<DangerRealtimeEntity> dangers = new List<DangerRealtimeEntity>();
                if (string.IsNullOrWhiteSpace(RedisHelper.Get("DangerRealtimeEntity")) || string.IsNullOrWhiteSpace(RedisHelper.Get("DangerTargetEntity")))
                {
                    RedisHelper.Set("DangerRealtimeEntity", ( _repository.AsSugarClient().Queryable<DangerRealtimeEntity>().Where(x => x.Deletemark == 0).ToList()).ToJsonString());
                    RedisHelper.Set("DangerTargetEntity",  ( _repository.AsSugarClient().Queryable<DangerTargetEntity>().Where(x => x.Deletemark == 0).ToList()).ToJsonString());
                }
                var data = RedisHelper.Get("DangerRealtimeEntity").ToObject<List<DangerRealtimeEntity>>();
                var datas = RedisHelper.Get("DangerTargetEntity").ToObject<List<DangerTargetEntity>>();
                //var data = _repository.AsSugarClient().Queryable<DangerRealtimeEntity>().Where(x => x.Deletemark == 0).ToList();
                //var datas = _repository.AsSugarClient().Queryable<DangerTargetEntity>().Where(x => x.Deletemark == 0).ToList();
                foreach (var item1 in keys)
                {
                    foreach (var item2 in item1.Value.fields)
                    {
                        DangerRealtimeEntity cjsJ = new DangerRealtimeEntity();
                        cjsJ.Id = data.Find(t => t.IndexCode.Equals(item2.Key) && t.Company.Equals(state.Find(x => x.Code.Equals(item1.Key))?.Company))?.Id;
                        //开关量取反
                        if (datas.Find(t => t.Code.Equals(item2.Key) && t.Company.Equals(state.Find(x => x.Code.Equals(item1.Key))?.Company))?.Invertornot == 1 && datas.Find(t => t.Code.Equals(item2.Key) && t.Company.Equals(state.Find(x => x.Code.Equals(item1.Key))?.Company))?.Xhlx == "1")
                        {
                            if (item2.Value.Equals("0"))
                            {
                                cjsJ.Value = "1";
                            }
                            else
                            {
                                cjsJ.Value = "0";
                            }
                            cjsJ.AcquisitionTime = item1.Value.time.ToString().TimeStampToDateTime();
                            if (cjsJ.Id != null)
                            {
                                dangers.Add(cjsJ);
                            }
                        }
                        else
                        {
                            cjsJ.Value = item2.Value;
                            cjsJ.AcquisitionTime = item1.Value.time.ToString().TimeStampToDateTime();
                            if (cjsJ.Id != null)
                            {
                                dangers.Add(cjsJ);
                            }
                        }
                    }
                }
                var isOks = _repository.AsSugarClient().CopyNew().Fastest<DangerRealtimeEntity>().BulkUpdate(dangers, new string[] { "ID" }, new string[] { "VALUE", "ACQUISITION_TIME" });
                //var isOks = _repository.AsSugarClient().Updateable(dangers).UpdateColumns(it => new
                //{
                //    it.Value,
                //    it.AcquisitionTime,
                //}).ExecuteCommand();
                dangers.Clear();
                JNPF.Dispach.logHelper.writeLog(isOks.ToString(), "修改条数", "没问题");
            }
        }
    }


    0 回复
  • 你这个是局部代码,搞成能跑的rar 删掉obj和Bin

    0 回复
  • 1

    0 回复
  • @派大星.:我们有些底层代码涉及到机密,只能给部分代码,能分析下哪里出现的问题嘛?

    0 回复
  • @派大星.: 你写DEMO重现,不依赖现有代码

    0 回复
  • 还有一种可能就是list对象是线程不安全的,是不是全局引用了

    list序列化成字符串,然后在序列化在成对象 在传 插入测试

    0 回复
  • 或者 dangers=dangers.Adpat<List<T>>(); 重新DTO一下

    0 回复