none
SocketAsyncEventArgs类 异步Socket 如何创建和使用BufferManager RRS feed

  • 问题

  • 各位大侠们!小弟刚开始学习SocketAsyncEventArgs类,有很多地方不太明白像大侠们请教!

    我们用MSDN上提供的实例来说明,请看例子。

    class Server
        {
            private int m_numConnections;   // the maximum number of connections the sample is designed to handle simultaneously 
            private int m_receiveBufferSize;// buffer size to use for each socket I/O operation 
            BufferManager m_bufferManager;  // represents a large reusable set of buffers for all socket operations
            const int opsToPreAlloc = 2;    // read, write (don't alloc buffer space for accepts)
            Socket listenSocket;            // the socket used to listen for incoming connection requests
            // pool of reusable SocketAsyncEventArgs objects for write, read and accept socket operations
            SocketAsyncEventArgsPool m_readWritePool;
            int m_totalBytesRead;           // counter of the total # bytes received by the server
            int m_numConnectedSockets;      // the total number of clients connected to the server 
            Semaphore m_maxNumberAcceptedClients;
    
            // Create an uninitialized server instance.  
            // To start the server listening for connection requests
            // call the Init method followed by Start method 
            //
            // <param name="numConnections">the maximum number of connections the sample is designed to handle simultaneously</param>
            // <param name="receiveBufferSize">buffer size to use for each socket I/O operation</param>
            public Server(int numConnections, int receiveBufferSize)
            {
                m_totalBytesRead = 0;
                m_numConnectedSockets = 0;
                m_numConnections = numConnections;
                m_receiveBufferSize = receiveBufferSize;
                // allocate buffers such that the maximum number of sockets can have one outstanding read and 
                //write posted to the socket simultaneously  
                m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToPreAlloc,
                    receiveBufferSize);
    
                m_readWritePool = new SocketAsyncEventArgsPool(numConnections);
                m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
            }
    
            // Initializes the server by preallocating reusable buffers and 
            // context objects.  These objects do not need to be preallocated 
            // or reused, but it is done this way to illustrate how the API can 
            // easily be used to create reusable objects to increase server performance.
            //
            public void Init()
            {
                // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds 
                // against memory fragmentation
                m_bufferManager.InitBuffer();
    
                // preallocate pool of SocketAsyncEventArgs objects
                SocketAsyncEventArgs readWriteEventArg;
    
                for (int i = 0; i < m_numConnections; i++)
                {
                    //Pre-allocate a set of reusable SocketAsyncEventArgs
                    readWriteEventArg = new SocketAsyncEventArgs();
                    readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
                    readWriteEventArg.UserToken = new AsyncUserToken();
    
                    // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object
                    m_bufferManager.SetBuffer(readWriteEventArg);
    
                    // add SocketAsyncEventArg to the pool
                    m_readWritePool.Push(readWriteEventArg);
                }
    
            }
    
            // Starts the server such that it is listening for 
            // incoming connection requests.    
            //
            // <param name="localEndPoint">The endpoint which the server will listening 
            // for connection requests on</param>
            public void Start(IPEndPoint localEndPoint)
            {
                // create the socket which listens for incoming connections
                listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                listenSocket.Bind(localEndPoint);
                // start the server with a listen backlog of 100 connections
                listenSocket.Listen(100);
    
                // post accepts on the listening socket
                StartAccept(null);
    
                //Console.WriteLine("{0} connected sockets with one outstanding receive posted to each....press any key", m_outstandingReadCount);
                Console.WriteLine("Press any key to terminate the server process....");
                Console.ReadKey();
            }
    
    
            // Begins an operation to accept a connection request from the client 
            //
            // <param name="acceptEventArg">The context object to use when issuing 
            // the accept operation on the server's listening socket</param>
            public void StartAccept(SocketAsyncEventArgs acceptEventArg)
            {
                if (acceptEventArg == null)
                {
                    acceptEventArg = new SocketAsyncEventArgs();
                    acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
                }
                else
                {
                    // socket must be cleared since the context object is being reused
                    acceptEventArg.AcceptSocket = null;
                }
    
                m_maxNumberAcceptedClients.WaitOne();
                bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
                if (!willRaiseEvent)
                {
                    ProcessAccept(acceptEventArg);
                }
            }
    
            // This method is the callback method associated with Socket.AcceptAsync 
            // operations and is invoked when an accept operation is complete
            //
            void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
            {
                ProcessAccept(e);
            }
    
            private void ProcessAccept(SocketAsyncEventArgs e)
            {
                Interlocked.Increment(ref m_numConnectedSockets);
                Console.WriteLine("Client connection accepted. There are {0} clients connected to the server",
                    m_numConnectedSockets);
    
                // Get the socket for the accepted client connection and put it into the 
                //ReadEventArg object user token
                SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
                ((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket;
    
                // As soon as the client is connected, post a receive to the connection
                bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
                if (!willRaiseEvent)
                {
                    ProcessReceive(readEventArgs);
                }
    
                // Accept the next connection request
                StartAccept(e);
            }
    
            // This method is called whenever a receive or send operation is completed on a socket 
            //
            // <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
            void IO_Completed(object sender, SocketAsyncEventArgs e)
            {
                // determine which type of operation just completed and call the associated handler
                switch (e.LastOperation)
                {
                    case SocketAsyncOperation.Receive:
                        ProcessReceive(e);
                        break;
                    case SocketAsyncOperation.Send:
                        ProcessSend(e);
                        break;
                    default:
                        throw new ArgumentException("The last operation completed on the socket was not a receive or send");
                }
    
            }
    
            // This method is invoked when an asynchronous receive operation completes. 
            // If the remote host closed the connection, then the socket is closed.  
            // If data was received then the data is echoed back to the client.
            //
            private void ProcessReceive(SocketAsyncEventArgs e)
            {
                // check if the remote host closed the connection
                AsyncUserToken token = (AsyncUserToken)e.UserToken;
                if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
                {
                    //increment the count of the total bytes receive by the server
                    Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
                    Console.WriteLine("The server has read a total of {0} bytes", m_totalBytesRead);
    
                    //echo the data received back to the client
                    e.SetBuffer(e.Offset, e.BytesTransferred);
                    bool willRaiseEvent = token.Socket.SendAsync(e);
                    if (!willRaiseEvent)
                    {
                        ProcessSend(e);
                    }
    
                }
                else
                {
                    CloseClientSocket(e);
                }
            }
    
            // This method is invoked when an asynchronous send operation completes.  
            // The method issues another receive on the socket to read any additional 
            // data sent from the client
            //
            // <param name="e"></param>
            private void ProcessSend(SocketAsyncEventArgs e)
            {
                if (e.SocketError == SocketError.Success)
                {
                    // done echoing data back to the client
                    AsyncUserToken token = (AsyncUserToken)e.UserToken;
                    // read the next block of data send from the client
                    bool willRaiseEvent = token.Socket.ReceiveAsync(e);
                    if (!willRaiseEvent)
                    {
                        ProcessReceive(e);
                    }
                }
                else
                {
                    CloseClientSocket(e);
                }
            }
    
            private void CloseClientSocket(SocketAsyncEventArgs e)
            {
                AsyncUserToken token = e.UserToken as AsyncUserToken;
    
                // close the socket associated with the client
                try
                {
                    token.Socket.Shutdown(SocketShutdown.Send);
                }
                // throws if client process has already closed
                catch (Exception) { }
                token.Socket.Close();
    
                // decrement the counter keeping track of the total number of clients connected to the server
                Interlocked.Decrement(ref m_numConnectedSockets);
                m_maxNumberAcceptedClients.Release();
                Console.WriteLine("A client has been disconnected from the server. There are {0} clients connected to the server", m_numConnectedSockets);
    
                // Free the SocketAsyncEventArg so they can be reused by another client
                m_readWritePool.Push(e);
            }
        }

    class BufferManager
        {
            int m_numBytes;                 // the total number of bytes controlled by the buffer pool 总数量的字节控制缓冲池
            byte[] m_buffer;                // the underlying byte array maintained by the Buffer Manager 底层字节数组所维护的缓冲区管理器
            Stack<int> m_freeIndexPool;     // 
            int m_currentIndex;
            int m_bufferSize;
    
            public BufferManager(int totalBytes, int bufferSize)
            {
                m_numBytes = totalBytes;
                m_currentIndex = 0;
                m_bufferSize = bufferSize;
                m_freeIndexPool = new Stack<int>();
            }
    
            // Allocates buffer space used by the buffer pool
            public void InitBuffer()
            {
                // create one big large buffer and divide that 
                // out to each SocketAsyncEventArg object
                m_buffer = new byte[m_numBytes];
            }
    
            // Assigns a buffer from the buffer pool to the 
            // specified SocketAsyncEventArgs object
            //
            // <returns>true if the buffer was successfully set, else false</returns>
            public bool SetBuffer(SocketAsyncEventArgs args)
            {
    
                if (m_freeIndexPool.Count > 0)
                {
                    args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
                }
                else
                {
                    if ((m_numBytes - m_bufferSize) < m_currentIndex)
                    {
                        return false;
                    }
                    args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
                    m_currentIndex += m_bufferSize;
                }
                return true;
            }
    
            // Removes the buffer from a SocketAsyncEventArg object.  
            // This frees the buffer back to the buffer pool
            public void FreeBuffer(SocketAsyncEventArgs args)
            {
                m_freeIndexPool.Push(args.Offset);
                args.SetBuffer(null, 0, 0);
            }
        }

    我现在不明白的地方是:BufferManager类在这里起了个什么作用?上面Server类中黑体字部分,是实例化(New)和设置(SetBuffer) BufferManager类,那么具体在ProcessReceive方法和ProcessSend方法中是怎么使用的,可以说一下原理。


    • 已编辑 lsd1009 2012年5月29日 2:22
    2012年5月29日 2:21

答案

全部回复