none
Socket 長時間連線的記憶體錯誤 RRS feed

  • 問題

  • 小弟目前要實作一支程式,需長時間接收機器傳來的資料,並將定期回傳訊息給機器(告知機器正在接收資料),
    目前的做法是利用 socket 的非同步的方式,進行收發資料。
    但在長時間持續接收資料後,必發生記憶體錯誤的問題。

    一開始是參考 Microsoft MSDN 上的 使用非同步伺服器通訊端(https://msdn.microsoft.com/zh-tw/library/5w7b7x5f(v=vs.110).aspx) 的範例
    但會出現 Out of memory 的問題,在 VS2015 的診斷工具中可以很清楚的看到使用的堆疊空間一直上升,
    查看記錄,發現是 Socket 內的 OverlappedData 過大造成的 memory spike

    在 google 上查到的資料是說,因為 socket 在沒有 close 之前,會 copy 一份資料到 overlappedData 內,
    直到 close socket 後,才會釋放 OverlappedData 內的資料。
    但因為目前做法是同一個連線中的資料,視為同一組資料,如果斷線再連入的連線,視為另一組資料。
    所以需長時間連線,無法進行 close 的動作

    所以小弟才改為現在的寫法 (程式碼在下方)
    雖然改為目前寫法後,程式使用的堆疊空間持續在固定的範圍內,
    但卻變成, 在BeginReceive 時, 會丟出 SocketException,錯誤訊息是「因為系統缺乏足夠的緩衝區空間或因為佇列已滿,所以無法在通訊端上執行操作。」

    小弟覺得應該是自己那邊的觀念錯誤,才會造成這個問題。
    雖然請出google 大神,但還是解決不了這個問題。

    請問各位先進前輩,小弟應該要從那方面下手去修正這個錯誤,
    或是有那些資料可以參考?
    或是我的功能只能用同步的寫法實現?

    小弟在藍色小鋪也有發問,但想說有些前輩只出現在其中一個論壇,所以在這也發了一次文,
    如果這樣不行,小弟會多加注意的,謝謝。

    using System;
    using System.Net.Sockets;
    
    namespace EUtilityTools.Communication.Network
    {
        public class AsyncSocket : IDisposable
        {
            #region 定義
    
            public delegate void DgSendCompeleted();
            public delegate void DgReceivedCompeleted(byte[] data, int length);
    
            private DgReceivedCompeleted _dgReceivedDoneCallback = null;
            public DgReceivedCompeleted ReceivedDoneCallback { set { _dgReceivedDoneCallback = value; } }
    
            public DgMessage MessageCallback { set; get; } = null;
    
            // worker socket
            public Socket WorkingSocket { internal set; get; }
    
            public event EventHandler<EventArguments.ConnectionCloseEvent> ConnectionCloseHandler;
    
            private bool _disposed = false;
    
            private byte[] _receivedBuffer = null;
            private AsyncCallback _asyncCallback = null;
    
            #endregion
    
            public void Dispose()
            {
                Dispose(true);
                GC.SuppressFinalize(this);
            }
    
            void Dispose(bool disposing)
            {
                if (!_disposed)
                {
                    if (disposing)
                    {
                        // Release managed resources
                        if (WorkingSocket != null)
                        {
                            WorkingSocket.Shutdown(SocketShutdown.Both);
                            WorkingSocket.Close();
                            WorkingSocket.Dispose();
                            WorkingSocket = null;
                        }
                    }
    
                    // Release unmanaged resources
    
                    _disposed = true;
                }
            }
    
            public AsyncSocket(Socket socket, int receiveBufferLength = 2048, bool isKeepAlive = true, int keepAliveTime = 5000, int checkAliveTime = 1000)
            {
                if (socket == null)
                    throw new Exception();
    
                WorkingSocket = socket;
    
                _receivedBuffer = new byte[receiveBufferLength];
                _asyncCallback = new AsyncCallback(ReceiveDataCallback);
    
                //if (isKeepAlive)
                WorkingSocket.IOControl(IOControlCode.KeepAliveValues, KeepAlive(isKeepAlive ? 1 : 0, keepAliveTime, checkAliveTime), null);
            }
    
            ~AsyncSocket()
            {
                Dispose(false);
            }
    
            public void SetKeepAlive(bool isOpen, int keepAliveTime = 5000, int interval = 5000)
            {
                if (WorkingSocket == null)
                    return;
    
                WorkingSocket.IOControl(IOControlCode.KeepAliveValues, KeepAlive(isOpen ? 1 : 0, keepAliveTime, interval), null);
            }
    
    
            /// <summary>
            /// 
            /// </summary>
            /// <param name="onOff">是否開啟 (開 1/ 關 0)</param>
            /// <param name="keepAliveTime">Alive後經過多久時間(ms)開啟偵測</param>
            /// <param name="keepAliveInterval">多久偵測一次(ms)</param>
            /// <returns></returns>
            private static byte[] KeepAlive(int onOff, int keepAliveTime, int keepAliveInterval)
            {
                byte[] buffer = new byte[12];
                BitConverter.GetBytes(onOff).CopyTo(buffer, 0);
                BitConverter.GetBytes(keepAliveTime).CopyTo(buffer, 4);
                BitConverter.GetBytes(keepAliveInterval).CopyTo(buffer, 8);
                return buffer;
            }
    
            public void Receive()
            {
                BeginReceive();
            }
    
            private void BeginReceive()
            {
                if (WorkingSocket == null)
                    return;
    
                try
                {
                    WorkingSocket.BeginReceive(
                        _receivedBuffer,
                        0,
                        _receivedBuffer.Length,
                        SocketFlags.None,
                       _asyncCallback,
                        null);
                }
                catch (ObjectDisposedException ex)
                {
                    OutMessage($"AsyncSocket.BeginReceive::ObjectDisposedException]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.BeginReceive] 發生 ObjectDisposedException");
    
                    Close(ex);
                }
                catch (SocketException ex)
                {
                    OutMessage($"AsyncSocket.BeginReceive::SocketException]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.BeginReceive] 發生 SocketException");
                    Close(ex);
                }
                catch (Exception ex)
                {
                    System.Diagnostics.Debug.WriteLine($"AsyncSocket.BeginReceive::Exception]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.BeginReceive] 發生 Exception");
    
                    Close(ex);
                }
            }
    
            private void ReceiveDataCallback(IAsyncResult result)
            {
                try
                {
                    int bytes = WorkingSocket.EndReceive(result);
    
                    if (bytes > 0)
                    {
                        //  callback 接收資料
                        // state.Data.Write(state.buffer, 0, bytes);
    
                        if (_dgReceivedDoneCallback != null)
                        {
                            byte[] data = new byte[bytes];
    
                            Buffer.BlockCopy(_receivedBuffer, 0, data, 0, bytes);
                            _dgReceivedDoneCallback(data, bytes);
                        }
    
                        //  接收資料小於 Buffer size,表示資料接收完成
                        //  但,若填滿 Buffer 空間,可能未完整接資資料,需要再次接收資料
                        BeginReceive();
                    }
                    else
                    {
                        Close(new Exception("連線己關閉"));
                    }
    
    
                }
                catch (ObjectDisposedException ex)
                {
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.ReceiveDataCallback] 發生 ObjectDisponsedException");
                    OutMessage($"[AsysncSocket.ReceiveDataCallback::ObjectDisponsedException]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
    
                    Close();
                }
                catch (SocketException ex)
                {
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.ReceiveDataCallback] 發生 SocketException");
                    OutMessage($"[AsysncSocket.ReceiveDataCallback::SocketException]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
    
                    Close(ex);
                }
                catch (Exception ex)
                {
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.ReceiveDataCallback] 發生 Exception");
                    OutMessage($"AsyncSocket.ReceiveDataCallback::Exception]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
    
                    Close(ex);
                }
            }
    
            /// <summary>
            /// 寄出資料
            /// </summary>
            /// <param name="data"></param>
            /// <param name="offset"></param>
            /// <param name="length"></param>
            /// <param name="callback"></param>
            public bool Send(byte[] data, int offset, int length, DgSendCompeleted callback)
            {
                if (WorkingSocket == null)
                    return false;
    
                bool res = false;
    
                try
                {
    
                    WorkingSocket.BeginSend(
                        data,
                        offset,
                        length,
                        SocketFlags.None,
                        new AsyncCallback(SendDataCallback),
                        callback);
    
                    res = true;
    
                }
                catch (ObjectDisposedException ex)
                {
                    //The Socket object has been closed.
                    OutMessage($"[AsysncSocket.Send::ObjectDisposedException]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.Send] 發生 ObjectDisposedException");
    
                    Close(ex);
                }
                catch (SocketException ex)
                {
                    //  An error occurred when attempting to access the socket.
                    OutMessage($"AsyncSocket.Send::SocketException]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.Send] 發生 SocketException");
    
                    Close(ex);
                }
                catch (Exception ex)
                {
                    OutMessage($"AsyncSocket.Send::Expection]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.Send] 發生 Exception");
    
                    Close(ex);
                }
    
                return res;
            }
    
            private void SendDataCallback(IAsyncResult result)
            {
                try
                {
                    DgSendCompeleted callback = (DgSendCompeleted)result.AsyncState;
    
                    int sendBytes = WorkingSocket.EndSend(result);
    
                    //  呼叫 EndReceive 方法以成功完成讀取作業並傳回讀取的位元組數. 並一直封鎖到有資料可用為止。
                    //  若回傳零位元組,表示己關閉 socekt 連接。
                    if (sendBytes > 0)
                    {
                        if (callback != null)
                            callback();
                    }
                    else
                    {
                        Close(new Exception("連線己經被關閉"));
                    }
                }
                catch (ObjectDisposedException ex)
                {
                    OutMessage($"[AsysncSocket.SendDataCallback::ObjectDisposedException]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.SendDataCallback] 發生 ObjectDisposedException");
    
                    Close(ex);
                }
                catch (SocketException ex)
                {
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.SendDataCallback] 發生 SocketException");
    
                    OutMessage($"[AsysncSocket.SendDataCallback::SocketException]: {ex.ErrorCode}: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
    
                    Close(ex);
                }
                catch (Exception ex)
                {
                    System.Diagnostics.Debug.WriteLine($"AsyncSocket.SendDataCallback::Exception]: {WorkingSocket.RemoteEndPoint}{ex.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, $"[EUtilityTools.Communication.AsysncSocket.SendDataCallback] 發生 Exception");
    
                    Close(ex);
                }
            }
    
            public void Close()
            {
                Exception ex = new Exception("正常關閉");
    
                Close(ex);
            }
            public void Close(Exception ex)
            {
                System.Diagnostics.Debug.WriteLine($"[AsyncSocket.Close]: 進入 close().");
    
                try
                {
                    NLog.LogManager.GetCurrentClassLogger().Trace(ex.Message);
    
                    if (!WorkingSocket.Connected)
                    {
                        OutMessage($"[AsyncSocket.Close]: {WorkingSocket.RemoteEndPoint} Socket 己經被關閉");
                        NLog.LogManager.GetCurrentClassLogger().Trace("[EUtilityTools.Communication.AsysncSocket.Close] 連線己被關閉");
                        return;
                    }
    
                    WorkingSocket.Shutdown(SocketShutdown.Both);
                    WorkingSocket.Close();
                    WorkingSocket.Dispose();
                    WorkingSocket = null;
    
                    //raise event
                    if (ConnectionCloseHandler != null)
                    {
                        OutMessage("[AsysncSocket.Close]: Enter ConnectionCloseHandler function.");
                        NLog.LogManager.GetCurrentClassLogger().Trace("[EUtilityTools.Communication.AsysncSocket.Close] 回傳連線關閉事件");
    
                        EventArguments.ConnectionCloseEvent arg = new EventArguments.ConnectionCloseEvent(ex);
                        ConnectionCloseHandler(this, arg);
                    }
                }
                catch (Exception e)
                {
                    //ignore socket close error
                    OutMessage($"[AsysncSocket.Close::Exception]:{e.Message}{Environment.NewLine}{Environment.StackTrace}");
                    NLog.LogManager.GetCurrentClassLogger().Error(ex, "[EUtilityTools.Communication.AsysncSocket.Close] 連線關閉發生錯誤");
                }
            }
    
            protected void OutMessage(string msg)
            {
                MessageCallback?.Invoke(msg);
                System.Diagnostics.Debug.WriteLine(msg);
                NLog.LogManager.GetCurrentClassLogger().Info(msg);
            }
    
        }
    }



    2015年11月5日 上午 07:11

解答

  • 如果是 BeginReceive 沒有對應的 EndReceive 是會卡住沒錯.


    在現實生活中,你和誰在一起的確很重要,甚至能改變你的成長軌跡,決定你的人生成敗。 和什麼樣的人在一起,就會有什麼樣的人生。 和勤奮的人在一起,你不會懶惰; 和積極的人在一起,你不會消沈; 與智者同行,你會不同凡響; 與高人為伍,你能登上巔峰。

    • 已標示為解答 依恩 2015年11月6日 上午 08:20
    2015年11月6日 上午 08:12
    版主

所有回覆

  • 關於記憶體觀念部分:[.Net] 資料接收的常識

    不精確的問法,就會得到隨便猜的答案;自己都不肯花時間好好描述問題,又何必期望網友會認真回答?

    2015年11月5日 下午 02:54
  • 大約有多少 client ? 多長時間會造成out of memory ?, 你使用的 .net Framework 版本 ?

    在現實生活中,你和誰在一起的確很重要,甚至能改變你的成長軌跡,決定你的人生成敗。 和什麼樣的人在一起,就會有什麼樣的人生。 和勤奮的人在一起,你不會懶惰; 和積極的人在一起,你不會消沈; 與智者同行,你會不同凡響; 與高人為伍,你能登上巔峰。

    2015年11月5日 下午 05:10
    版主
  • 謝謝 心冷熱情熄 與 Bill 大大的回應

    to 心冷熱情熄 大大

    謝謝您的資料,這篇文章小弟會好好研究一下。

    ps. 一看到您回應的簽名檔,還以為是小弟提問的描述不清楚,後來才發現是簽名檔

    to Bill 大大

    預計最多會接到64台, 但在測試時,只有接2台,約3~4小時左右會出現 out of memory 的現象。

    小弟我目前用是.net framework 4.5 的版本,

    小弟推測..
    是否是在程式內,沒在注意到的地方,多次呼收 Socket.BeginReceive() 或 Socket.BeginSend() 的錯誤?

    2015年11月6日 上午 03:11
  • 我還在測試, 昨天測試大概 20 個 client , 一小時多, server 程式的記憶體還很正常, 反而是 Visual Studio 2013 的診斷工具先掛了. 查了一些網路上的文章, 說法和你講得差不多, 可是我沒有測出這個現象. 不過我習慣是接收使用非同步, 發送使用同步.

    我再進一步測測看.


    在現實生活中,你和誰在一起的確很重要,甚至能改變你的成長軌跡,決定你的人生成敗。 和什麼樣的人在一起,就會有什麼樣的人生。 和勤奮的人在一起,你不會懶惰; 和積極的人在一起,你不會消沈; 與智者同行,你會不同凡響; 與高人為伍,你能登上巔峰。


    2015年11月6日 上午 06:19
    版主
  • 真是麻煩到Bill 大大了

    小弟好像找到問題了. 
    目前持續接收四小時,記憶體與診斷工具內的堆積在固定範圍內變化,
    還在持續觀察中。

    問題應該是出在...
    在 ReceiveDataCallback 這個函數中,除了會 Callback 接收的資料外後,會再次 Receive()
    好死不死, 在 Callback 的函數之中,又呼叫了一次 Receive()
    結果造成多次呼叫 Socket.BeginReceive() ,與Socket.EndReceive() 不對等
    導至 Socket 不斷開新的接收線程,無法結束與釋放使用的記憶。
    最後造成記憶體錯誤的問題。

    結果這麼一個的錯誤,找了三週的時間才發現。

    謝謝 Bill 大大 與 心冷熱情熄大大 的幫忙

    2015年11月6日 上午 08:00
  • 如果是 BeginReceive 沒有對應的 EndReceive 是會卡住沒錯.


    在現實生活中,你和誰在一起的確很重要,甚至能改變你的成長軌跡,決定你的人生成敗。 和什麼樣的人在一起,就會有什麼樣的人生。 和勤奮的人在一起,你不會懶惰; 和積極的人在一起,你不會消沈; 與智者同行,你會不同凡響; 與高人為伍,你能登上巔峰。

    • 已標示為解答 依恩 2015年11月6日 上午 08:20
    2015年11月6日 上午 08:12
    版主
  • 為了符合你的測試, 我做了一個三個小時的實驗

    (1) client 數量: 1000

    (2) 每個 client 在 40~50 秒間會送出 320 bytes 左右的資料, 也就是 1000 個 client 加起來約是 300KB.

    (3) 在185分鐘左右的時間, 總共的送出次數超過 200 次

    (4) 以下是我在關閉 server 前的記憶體快照圖, 看起來應該是沒有 memory leak 的現象 (如果你後續還有問題, 歡迎繼續來這邊討論, 我對你這個課題很有興趣 )


    在現實生活中,你和誰在一起的確很重要,甚至能改變你的成長軌跡,決定你的人生成敗。 和什麼樣的人在一起,就會有什麼樣的人生。 和勤奮的人在一起,你不會懶惰; 和積極的人在一起,你不會消沈; 與智者同行,你會不同凡響; 與高人為伍,你能登上巔峰。


    2015年11月6日 下午 05:37
    版主
  • Hi 

    我有跟樓主一樣的需求

    也碰到同樣問題 我在readcallback中 不中斷連線 處理完資料後 繼續讓他 beginreceive 以接收同一個connection的資料

    但在壓力測試 大量傳輸後 就碰到跟樓主同樣問題

    請問樓主後來是怎麼解決的呢???

    2018年12月7日 上午 01:50
  • 出問題時,用工作管理員看一下有使用到 SerialPort 那隻程式的執行緒,是不是超過 500 條就掛...

    不精確的問法,就會得到隨便猜的答案;自己都不肯花時間好好描述問題,又何必期望網友會認真回答?

    2018年12月15日 下午 04:09