none
关于完成端口,iocp内存泄漏问题,求高手解答~ RRS feed

  • 问题

  • 我全局加上了申请和释放内存的计数器,再三检查代码,确认自己申请的内存均已释放~

    实在找不出那里的内存泄漏,求高手~

    项目地址:http://git.oschina.net/jilieryuyi/wing-php

    关键代码:


    void _throw_error( int error_code );
    void _post_msg(int message_id,unsigned long wparam=0,unsigned long lparam=0){
    elemType *msg  = new elemType();  
    if( NULL == msg ) _throw_error(WING_BAD_ERROR);

    memory_add();

    HANDLE handle = GetCurrentProcess();
    PROCESS_MEMORY_COUNTERS pmc;
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));

    msg->message_id = message_id;
    msg->wparam = wparam;
    msg->lparam = lparam;
    msg->size  = pmc.WorkingSetSize;

    enQueue(message_queue,msg);
    }

    typedef struct _SOCKET_OBJ
    {
    SOCKET socket;             
    int nOutstandingOps;    
    LPFN_DISCONNECTEX lpfnDisconnectEx; 
    }SOCKET_OBJ,*PSOCKET_OBJ;

    PSOCKET_OBJ GetSocketObj(SOCKET socket)
    {
    PSOCKET_OBJ pSocket = (PSOCKET_OBJ)GlobalAlloc(GPTR,sizeof(SOCKET_OBJ));
    if(pSocket == NULL) return NULL;
    pSocket->socket = socket;
    return pSocket;
    }
    void _close_socket( SOCKET socket){

    CancelIo((HANDLE)socket);

    GUID GuidDisconnectEx = WSAID_DISCONNECTEX;
    DWORD dwBytes;
    PSOCKET_OBJ pSock = GetSocketObj(socket);

    if(NULL == pSock ){
    _post_msg(WM_ONERROR,0,WING_ERROR_CLOSE_SOCKET);
    return;
    }

    WSAIoctl(pSock->socket,SIO_GET_EXTENSION_FUNCTION_POINTER,&GuidDisconnectEx,sizeof(GuidDisconnectEx),&pSock ->lpfnDisconnectEx,sizeof(pSock ->lpfnDisconnectEx),&dwBytes,NULL,NULL);
    //DisconnectEx(socket,NULL,0,0);

    BOOL bIs = pSock->lpfnDisconnectEx(socket,NULL,TF_REUSE_SOCKET,0);
    GlobalFree(pSock);
    }

    void _throw_error( int error_code ){
    exit(WING_BAD_ERROR);
    }


    // iocp工作线程
    unsigned int __stdcall  socket_worker(LPVOID lpParams)  
    {  
    COMPARAMS *params = (COMPARAMS *)lpParams;
    HANDLE ComplectionPort = params->IOCompletionPort;  
    DWORD thread_id = params->threadid;
    DWORD BytesTransferred=0;  
    PER_HANDLE_DATA *PerHandleData=NULL;  
    PER_IO_OPERATION_DATA *PerIOData=NULL;  
    DWORD RecvBytes=0;  
    DWORD Flags=0;
    HANDLE handle = GetCurrentProcess();
    PROCESS_MEMORY_COUNTERS pmc;

    while (TRUE)  
    {  
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));
    unsigned int begin_size = pmc.WorkingSetSize;
    BOOL wstatus = GetQueuedCompletionStatus(ComplectionPort,&BytesTransferred,(LPDWORD)&PerHandleData,(LPOVERLAPPED*)&PerIOData,INFINITE);
     

    //服务终止
    if( BytesTransferred==-1 && PerIOData == NULL )   
            { 

    _post_msg(WM_ONQUIT);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerHandleData );

    PerHandleData = NULL;
    PerIOData = NULL;

    memory_sub();

    return 0;  
            }  

    int error_code = WSAGetLastError();
    unsigned long _socket = (unsigned long)PerHandleData->Socket;
    //掉线检测
    if (BytesTransferred == 0 || 10054 == error_code || 64 == error_code || false == wstatus ||  1236 == error_code) 
    {   
    _post_msg(WM_ONCLOSE,_socket);

    if(1236 != error_code)//服务端调用了 _close_socket 强制终止
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerIOData );
    GlobalFree( PerHandleData );

    PerHandleData = NULL;
    PerIOData = NULL;

    memory_sub();
    memory_sub();

    continue;  
    }

    //收到消息
    if( PerIOData->type == OPE_RECV )  {

    RECV_MSG *recv_msg = new RECV_MSG();   
    if( NULL == recv_msg ){
    _throw_error(WING_ERROR_MALLOC);
    return 0;
    }
    ZeroMemory(recv_msg,sizeof(RECV_MSG));

    recv_msg->msg = new char[BytesTransferred+1];
    ZeroMemory(recv_msg->msg,BytesTransferred+1);

    strcpy_s(recv_msg->msg,BytesTransferred+1,PerIOData->Buffer);
    recv_msg->len =  BytesTransferred+1;

    memory_add();
    memory_add();


    _post_msg(WM_ONRECV,_socket,(unsigned long)recv_msg);

    BytesTransferred = 0;
    Flags = 0;  

    //CloseHandle(PerIOData->OVerlapped.hEvent);

    ZeroMemory(&(PerIOData->OVerlapped),sizeof(OVERLAPPED)); 
    ZeroMemory(PerIOData->Buffer,DATA_BUFSIZE);

    PerIOData->DATABuf.buf = PerIOData->Buffer;  
    PerIOData->DATABuf.len = DATA_BUFSIZE;  
    PerIOData->type = OPE_RECV;

    int error_code = WSARecv(PerHandleData->Socket,&(PerIOData->DATABuf),1,&RecvBytes,&Flags,&(PerIOData->OVerlapped),NULL);

    if( 0 != error_code && WSAGetLastError() != WSA_IO_PENDING){

    _post_msg(WM_ONCLOSE,_socket);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerIOData );
    GlobalFree( PerHandleData );

    PerIOData = NULL;
    PerHandleData = NULL;

    memory_sub();
    memory_sub();

    }

    }

    //发送消息完成
    else if(PerIOData->type == OPE_SEND)  
    {   
    ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));

    //CloseHandle(PerIOData->OVerlapped.hEvent);
    GlobalFree( PerIOData ); 

    PerIOData = NULL;
    BytesTransferred = 0;

    memory_sub();

    _post_msg(WM_ONSEND,_socket);
    }  

    }  
    return 0;
    }  


    //接受客户端连接进来的工作线程
    unsigned int __stdcall  accept_worker(LPVOID _socket) {

    COMPARAMS *_data = (COMPARAMS*)_socket;
    SOCKET m_sockListen = _data->Socket;
    HANDLE m_hIOCompletionPort = _data->IOCompletionPort;
    DWORD threadid = _data->threadid;

    SOCKET accept ;
    PER_HANDLE_DATA *PerHandleData = NULL;
    PER_IO_OPERATION_DATA *PerIOData = NULL;
    DWORD RecvBytes = 0;  
        DWORD Flags = 0; 


    while(true){

    accept = WSAAccept(m_sockListen,NULL,NULL,NULL,0);
    int error_code = WSAGetLastError() ;

    //10024 打开了过多的套接字
    if( INVALID_SOCKET == accept || 10024 == error_code || SOCKET_ERROR == accept ){
    _post_msg(WM_ONERROR,0,WING_ERROR_ACCEPT);
    continue;
    }

    _post_msg(WM_ONCONNECT,(unsigned long)accept);

    PerHandleData =  (PER_HANDLE_DATA*)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA));//new PER_HANDLE_DATA();

    if( NULL == PerHandleData ){
    _post_msg(WM_ONCLOSE,(unsigned long)accept);
    _close_socket(accept);
    continue;
    }

    memory_add();
     
    PerHandleData->Socket = accept;
    HANDLE iocp = CreateIoCompletionPort ((HANDLE )accept ,m_hIOCompletionPort , (ULONG_PTR)PerHandleData ,0);

    if( NULL == iocp){

    _post_msg(WM_ONCLOSE,(unsigned long)PerHandleData->Socket);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerHandleData );

    memory_sub();

    continue;
    }

    PerIOData =   (PER_IO_OPERATION_DATA*)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA));// new PER_IO_OPERATION_DATA();
    if ( NULL == PerIOData )
    {
    _post_msg(WM_ONCLOSE,(unsigned long)PerHandleData->Socket);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerHandleData );
    PerHandleData = NULL;

    memory_sub();
    continue;
    }
    memory_add();

    ZeroMemory(&(PerIOData->OVerlapped),sizeof(OVERLAPPED)); 
    ZeroMemory(PerIOData->Buffer,sizeof(PerIOData->Buffer));

    PerIOData->DATABuf.len = DATA_BUFSIZE; 
    PerIOData->DATABuf.buf = PerIOData->Buffer; 
    PerIOData->type = 1;
    Flags = 0; 

    //这个地方偶尔报异常 还没完全解决
    int code = WSARecv(accept,&(PerIOData->DATABuf),1,&RecvBytes,&Flags,&(PerIOData->OVerlapped),NULL);
    error_code =  WSAGetLastError();
    //调用一次 WSARecv 触发iocp事件
    if(0 != code && WSA_IO_PENDING != error_code){

    _post_msg(WM_ONCLOSE,(unsigned long)PerHandleData->Socket);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerHandleData );
    GlobalFree( PerIOData );

    PerHandleData = NULL;
    PerIOData = NULL;

    memory_sub();
    memory_sub();
    continue;
    }
    }
    return 0;
    }

    //iocp 服务主线程
    ZEND_FUNCTION(wing_service){

    zval *onreceive = NULL;
    zval *onconnect = NULL;
    zval *onclose = NULL;
    zval *onerror = NULL;
    zval *call_cycle=NULL;
    zval *_params = NULL;
    int port = 0;
    char *listen_ip = NULL;

    MAKE_STD_ZVAL(onreceive);
    MAKE_STD_ZVAL(onconnect);
    MAKE_STD_ZVAL(onclose);
    MAKE_STD_ZVAL(onerror);


    //参数获取
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z",&_params) != SUCCESS) {

    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }
    //如果参数不是数组
    if(Z_TYPE_P(_params) != IS_ARRAY){
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }


    HashTable *arr_hash = Z_ARRVAL_P(_params);
    int argc= zend_hash_num_elements(arr_hash);
    zval  **data = NULL;
    HashPosition pointer = NULL;


    //数组参数解析
    for(zend_hash_internal_pointer_reset_ex(arr_hash, &pointer); zend_hash_get_current_data_ex(arr_hash, (void**) &data, &pointer) == SUCCESS; zend_hash_move_forward_ex(arr_hash, &pointer)) {
            char *key;
            int key_len;
            long index;
            if (zend_hash_get_current_key_ex(arr_hash, &key, (uint*)&key_len, (ulong*)&index, 0, &pointer) == HASH_KEY_IS_STRING) {

    if(strcmp(key,"port")==0){
    port = Z_LVAL_PP(data);
    }else if(strcmp(key,"listen")==0){
    listen_ip = Z_STRVAL_PP(data);
    }else if(strcmp(key,"onreceive")==0){
    onreceive = *data;
    }else if(strcmp(key,"onconnect")==0){
    onconnect = *data;
    }else if(strcmp(key,"onclose")==0){
    onclose = *data;
    }else if(strcmp(key,"onerror")==0){
    onerror = *data;
    }
    //call_cycle
    else if(strcmp(key,"call_cycle")==0){
    call_cycle = *data;
    }

            } 
        } 

    //初始化消息队列
    message_queue= new queue_t();
    if( NULL == message_queue )exit(WING_BAD_ERROR);
        initQueue(message_queue);  

    //创建完成端口
    HANDLE m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 ); 
    if(m_hIOCompletionPort == NULL){
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }

    //初始化线程参数
    DWORD thread_id = GetCurrentThreadId();
    COMPARAMS *accept_params = new COMPARAMS();
    if( NULL == accept_params) exit(WING_BAD_ERROR);
    accept_params->threadid = thread_id;
    accept_params->IOCompletionPort = m_hIOCompletionPort;

    //根据cpu数量创建工作线程
    SYSTEM_INFO si; 
    GetSystemInfo(&si); 
    int m_nProcessors = si.dwNumberOfProcessors; 
    int m_nThreads = 2 * m_nProcessors; 
    for (int i = 0; i < m_nThreads; i++) 

    _beginthreadex(NULL, 0, socket_worker, accept_params, 0, NULL); 



    //初始化Socket
    WSADATA wsaData; 
    if(WSAStartup(MAKEWORD(2,2), &wsaData)!=0){
    delete accept_params;
    accept_params = NULL;
    //memory_sub();
    RETURN_LONG(WING_ERROR_FAILED);
    return; 
    }

    //WSACleanup( );
    //初始化Socket
    // 这里需要特别注意,如果要使用重叠I/O的话,这里必须要使用WSASocket来初始化Socket
    // 注意里面有个WSA_FLAG_OVERLAPPED参数
    SOCKET m_sockListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); 
    if(m_sockListen == INVALID_SOCKET){
    WSACleanup();
    delete accept_params;
    accept_params = NULL;
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }


    struct sockaddr_in ServerAddress; 
    // 填充地址结构信息
    ZeroMemory(&ServerAddress, sizeof(ServerAddress)); 
    ServerAddress.sin_family = AF_INET; 
    // 这里可以选择绑定任何一个可用的地址,或者是自己指定的一个IP地址
    //ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);                     
    ServerAddress.sin_addr.s_addr = inet_addr(listen_ip);          
    ServerAddress.sin_port = htons(port);   


    // 绑定端口
    if (SOCKET_ERROR == bind(m_sockListen, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress))){
    _close_socket(m_sockListen);
    WSACleanup();
    delete accept_params;
    accept_params = NULL;
    //memory_sub();
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }  

    // 开始监听
    if( 0 != listen(m_sockListen,SOMAXCONN)){
    _close_socket(m_sockListen);
    WSACleanup();
    delete accept_params;
    accept_params = NULL;
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }

    accept_params->Socket = m_sockListen;
    //客户端连接进来的处理线程
    _beginthreadex(NULL, 0, accept_worker, accept_params, 0, NULL);


    int times = 0;
    int nSize = 0;
    elemType *msg = NULL;//消息



    HANDLE handle = GetCurrentProcess();
    PROCESS_MEMORY_COUNTERS pmc;
        
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));
    unsigned int begin_size = pmc.WorkingSetSize;
        zend_printf("size:%d\r\n",pmc.WorkingSetSize);


    while( true )

    //判断是否有消息
    if(is_emptyQueue(message_queue)){
    Sleep(10);
    continue;
    }
    //_CrtMemCheckpoint( &s1 );
     
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));
    zend_printf("size-start:%d\r\n",pmc.WorkingSetSize-begin_size);


    outQueue(message_queue,&msg);

    if( NULL == msg ){

    Sleep(10);
    continue;

    }

    //根据消息ID进行不同的处理
    switch(msg->message_id){
    //新的连接
    case WM_ONCONNECT:
    {
    zval *params = NULL;
    zval *retval_ptr = NULL;

    MAKE_STD_ZVAL(params);
    ZVAL_LONG(params,(long)msg->wparam);//socket资源
    MAKE_STD_ZVAL(retval_ptr);
     
    if( SUCCESS != call_user_function(EG(function_table),NULL,onconnect,retval_ptr,1,&params TSRMLS_CC) ){
    zend_error(E_USER_WARNING,"onconnect call_user_function fail");
    }
    zval_ptr_dtor(&retval_ptr);
    zval_ptr_dtor(&params);
     
    }
    break;
    //收到消息
    case WM_ONRECV:
    {

    RECV_MSG *temp = (RECV_MSG*)msg->lparam;
    SOCKET client = (SOCKET)msg->wparam;

    zval *params[2] = {0};
    zval *retval_ptr = NULL;

    MAKE_STD_ZVAL(params[0]);
    MAKE_STD_ZVAL(params[1]);
    MAKE_STD_ZVAL(retval_ptr);

    ZVAL_LONG(params[0],(long)client);
    ZVAL_STRINGL(params[1],temp->msg,temp->len,1);


    if( SUCCESS != call_user_function(EG(function_table),NULL,onreceive,retval_ptr,2,params TSRMLS_CC) ){
    zend_error(E_USER_WARNING,"onreceive call_user_function fail");
    }

    zval_ptr_dtor(&retval_ptr);
    zval_ptr_dtor(&params[0]);
    zval_ptr_dtor(&params[1]);

    delete[] temp->msg;
    temp->msg = NULL;

    delete temp;
    temp = NULL;

    memory_sub();
    memory_sub();
    }
    break;
    //调用 _close_socket 服务端主动关闭socket
    case WM_ONCLOSE_EX:{

    _close_socket((SOCKET)msg->wparam);
    }break;

    //客户端掉线了
    case WM_ONCLOSE:
    {


    SOCKET client =(SOCKET)msg->wparam;

    zval *params = NULL;
    zval *retval_ptr = NULL;

    MAKE_STD_ZVAL(params);
    ZVAL_LONG(params,(long)client);
    MAKE_STD_ZVAL(retval_ptr);
     
    if(SUCCESS != call_user_function(EG(function_table),NULL,onclose,retval_ptr,1,&params TSRMLS_CC)){
    zend_error(E_USER_WARNING,"WM_ONCLOSE call_user_function fail\r\n");
    }
     
    zval_ptr_dtor(&retval_ptr);
    zval_ptr_dtor(&params);

    }
    break; 
    //发生错误 目前暂时也还没有用到
    case WM_ONERROR:{

    SOCKET client =(SOCKET)msg->wparam;

    zval *params = NULL;
    zval *retval_ptr = NULL;

    MAKE_STD_ZVAL(params);
    ZVAL_LONG(params,(long)msg->lparam);

    MAKE_STD_ZVAL(retval_ptr);
     
    if(SUCCESS != call_user_function(EG(function_table),NULL,onerror,retval_ptr,1,&params TSRMLS_CC)){
    zend_error(E_USER_WARNING,"onerror call_user_function fail");
    }
     
    zval_ptr_dtor(&retval_ptr);
    zval_ptr_dtor(&params);
    }
    break;
    //退出服务 暂时没有测试
    case WM_ONQUIT:
    {
    //zend_printf("quit\r\n");

    //PostQueuedCompletionStatus(m_hIOCompletionPort, 0xFFFFFFFF, 0, NULL);

    CloseHandle(m_hIOCompletionPort);
    _close_socket(m_sockListen);

    WSACleanup();
    clearQueue(message_queue);
    DeleteCriticalSection(&queue_lock);

    delete accept_params;
    delete msg;
    delete message_queue;

    accept_params = NULL;
    msg = NULL;
    message_queue = NULL;

    //zend_printf("quit end\r\n");
    RETURN_LONG(WING_SUCCESS);
    return;
    }break;

    }

    delete msg;
    msg = NULL;

    memory_sub();

    //显示内存申请 释放次数对比
    memory_times_show();
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));
    zend_printf("size-last:%d\r\n",pmc.WorkingSetSize-begin_size);
      
        } 
    zend_printf("service quit\r\n");
    RETURN_LONG(WING_SUCCESS);
    }
    /***********************************
     * @停止服务
     ***********************************/
    ZEND_FUNCTION(wing_service_stop){
    _post_msg(WM_ONQUIT);
    RETURN_LONG(WING_SUCCESS);
    }

    /********************************************
     * @ 关闭socket
     * @ param socket
     ********************************************/
    ZEND_FUNCTION(wing_close_socket){

    zval *socket = NULL;
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z",&socket) != SUCCESS) {
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }
    convert_to_long(socket);

    _post_msg(WM_ONCLOSE_EX,Z_LVAL_P(socket));

    RETURN_LONG(WING_SUCCESS);
    }

    /*****************************************
     * @获取socket信息,ip 协议 端口 等
     * @return array
     ****************************************/
    ZEND_FUNCTION(wing_socket_info){

    zval *socket = NULL;
    MAKE_STD_ZVAL(socket);
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z",&socket) != SUCCESS) {
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }

    convert_to_long(socket);
    SOCKADDR_IN addr_conn;  
    int nSize = sizeof(addr_conn);  

    //memset((void *)&addr_conn,0,sizeof(addr_conn)); 
    ZeroMemory(&addr_conn,sizeof(addr_conn));
    getpeername((SOCKET)Z_LVAL_P(socket),(SOCKADDR *)&addr_conn,&nSize);  
      
    array_init(return_value);
    add_assoc_string(return_value,"sin_addr",inet_ntoa(addr_conn.sin_addr),1);
        add_assoc_long(return_value,"sin_family",addr_conn.sin_family);
    add_assoc_long(return_value,"sin_port",addr_conn.sin_port);
    add_assoc_string(return_value,"sin_zero",addr_conn.sin_zero,1);
    }
    /*****************************************
     * @ 发送socket数据
     * @ 同步发送接口 没有使用iocp
     ****************************************/
    ZEND_FUNCTION(wing_socket_send_msg)
    {  

    zval *socket;
    zval *msg;
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zz",&socket,&msg) != SUCCESS) {
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }
    convert_to_long(socket);

    //此处没有使用完成端口 完成端口发送后 如果直接调用 socketclose 
    //关闭socket 有坑,处理不好会有内存泄漏
    if( SOCKET_ERROR == send((SOCKET)Z_LVAL_P(socket),Z_STRVAL_P(msg),Z_STRLEN_P(msg),0)){
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }

    RETURN_LONG(WING_SUCCESS);
    }  
    /***************************************************
     * @ 使用iocp异步发送消息
     ***************************************************/ 
    ZEND_FUNCTION(wing_socket_send_msg_ex){
    zval *socket = NULL;
    zval *msg = NULL;
    int close_after_send = 0;//发送完关闭socket 默认为false 否 待定 还没开发

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zz|l",&socket,&msg,&close_after_send) != SUCCESS) {
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }
    convert_to_long(socket);


    SOCKET sClient = (SOCKET)Z_LVAL_P(socket);

    char *pData  = Z_STRVAL_P(msg);
    ulong Length  = Z_STRLEN_P(msg);
    unsigned long  Flag = 0;  
    DWORD SendByte = 0;  

        if ( sClient == INVALID_SOCKET || pData == NULL || Length == 0 ){

    RETURN_LONG(WING_ERROR_FAILED);
    return;  
    }
       
    PER_IO_OPERATION_DATA  *PerIoData =  (PER_IO_OPERATION_DATA*)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA));//new PER_IO_OPERATION_DATA();
    memory_add();

    ZeroMemory(&(PerIoData->OVerlapped),sizeof(OVERLAPPED));      
    PerIoData->DATABuf.buf = pData; 
    PerIoData->DATABuf.len = Length; 
    PerIoData->type = OPE_SEND;

    int bRet  = WSASend(sClient,&(PerIoData->DATABuf),1,&SendByte,Flag,&(PerIoData->OVerlapped),NULL);  
    if( bRet != 0 &&  WSAGetLastError() != WSA_IO_PENDING ){

    GlobalFree( PerIoData );
    PerIoData = NULL;
    memory_sub();
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }
    RETURN_LONG(WING_SUCCESS);
    }


    2016年6月12日 0:39

全部回复

  • 我全局加上了申请和释放内存的计数器,再三检查代码,确认自己申请的内存均已释放~

    实在找不出那里的内存泄漏,求高手~

    项目地址:http://git.oschina.net/jilieryuyi/wing-php

    关键代码:


    void _throw_error( int error_code );
    void _post_msg(int message_id,unsigned long wparam=0,unsigned long lparam=0){
    elemType *msg = new elemType();  
    if( NULL == msg ) _throw_error(WING_BAD_ERROR);

    memory_add();

    HANDLE handle = GetCurrentProcess();
    PROCESS_MEMORY_COUNTERS pmc;
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));

    msg->message_id = message_id;
    msg->wparam = wparam;
    msg->lparam = lparam;
    msg->size = pmc.WorkingSetSize;

    enQueue(message_queue,msg);
    }

    typedef struct _SOCKET_OBJ
    {
    SOCKET socket;             
    int nOutstandingOps;    
    LPFN_DISCONNECTEX lpfnDisconnectEx; 
    }SOCKET_OBJ,*PSOCKET_OBJ;

    PSOCKET_OBJ GetSocketObj(SOCKET socket)
    {
    PSOCKET_OBJ pSocket = (PSOCKET_OBJ)GlobalAlloc(GPTR,sizeof(SOCKET_OBJ));
    if(pSocket == NULL) return NULL;
    pSocket->socket = socket;
    return pSocket;
    }
    void _close_socket( SOCKET socket){

    CancelIo((HANDLE)socket);

    GUID GuidDisconnectEx = WSAID_DISCONNECTEX;
    DWORD dwBytes;
    PSOCKET_OBJ pSock = GetSocketObj(socket);

    if(NULL == pSock ){
    _post_msg(WM_ONERROR,0,WING_ERROR_CLOSE_SOCKET);
    return;
    }

    WSAIoctl(pSock->socket,SIO_GET_EXTENSION_FUNCTION_POINTER,&GuidDisconnectEx,sizeof(GuidDisconnectEx),&pSock ->lpfnDisconnectEx,sizeof(pSock ->lpfnDisconnectEx),&dwBytes,NULL,NULL);
    //DisconnectEx(socket,NULL,0,0);

    BOOL bIs = pSock->lpfnDisconnectEx(socket,NULL,TF_REUSE_SOCKET,0);
    GlobalFree(pSock);
    }

    void _throw_error( int error_code ){
    exit(WING_BAD_ERROR);
    }


    // iocp工作线程
    unsigned int __stdcall  socket_worker(LPVOID lpParams)  
    {  
    COMPARAMS *params = (COMPARAMS *)lpParams;
    HANDLE ComplectionPort = params->IOCompletionPort;  
    DWORD thread_id = params->threadid;
    DWORD BytesTransferred=0;  
    PER_HANDLE_DATA *PerHandleData=NULL;  
    PER_IO_OPERATION_DATA *PerIOData=NULL;  
    DWORD RecvBytes=0;  
    DWORD Flags=0;
    HANDLE handle = GetCurrentProcess();
    PROCESS_MEMORY_COUNTERS pmc;

    while (TRUE)  
    {  
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));
    unsigned int begin_size = pmc.WorkingSetSize;
    BOOL wstatus = GetQueuedCompletionStatus(ComplectionPort,&BytesTransferred,(LPDWORD)&PerHandleData,(LPOVERLAPPED*)&PerIOData,INFINITE);
     

    //服务终止
    if( BytesTransferred==-1 && PerIOData == NULL )   
            { 

    _post_msg(WM_ONQUIT);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerHandleData );

    PerHandleData = NULL;
    PerIOData = NULL;

    memory_sub();

    return 0;  
            }  

    int error_code = WSAGetLastError();
    unsigned long _socket = (unsigned long)PerHandleData->Socket;
    //掉线检测
    if (BytesTransferred == 0 || 10054 == error_code || 64 == error_code || false == wstatus ||  1236 == error_code) 
    {   
    _post_msg(WM_ONCLOSE,_socket);

    if(1236 != error_code)//服务端调用了 _close_socket 强制终止
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerIOData );
    GlobalFree( PerHandleData );

    PerHandleData = NULL;
    PerIOData = NULL;

    memory_sub();
    memory_sub();

    continue;  
    }

    //收到消息
    if( PerIOData->type == OPE_RECV )  {

    RECV_MSG *recv_msg = new RECV_MSG();   
    if( NULL == recv_msg ){
    _throw_error(WING_ERROR_MALLOC);
    return 0;
    }
    ZeroMemory(recv_msg,sizeof(RECV_MSG));

    recv_msg->msg = new char[BytesTransferred+1];
    ZeroMemory(recv_msg->msg,BytesTransferred+1);

    strcpy_s(recv_msg->msg,BytesTransferred+1,PerIOData->Buffer);
    recv_msg->len =  BytesTransferred+1;

    memory_add();
    memory_add();


    _post_msg(WM_ONRECV,_socket,(unsigned long)recv_msg);

    BytesTransferred = 0;
    Flags = 0;  

    //CloseHandle(PerIOData->OVerlapped.hEvent);

    ZeroMemory(&(PerIOData->OVerlapped),sizeof(OVERLAPPED)); 
    ZeroMemory(PerIOData->Buffer,DATA_BUFSIZE);

    PerIOData->DATABuf.buf = PerIOData->Buffer;  
    PerIOData->DATABuf.len = DATA_BUFSIZE;  
    PerIOData->type = OPE_RECV;

    int error_code = WSARecv(PerHandleData->Socket,&(PerIOData->DATABuf),1,&RecvBytes,&Flags,&(PerIOData->OVerlapped),NULL);

    if( 0 != error_code && WSAGetLastError() != WSA_IO_PENDING){

    _post_msg(WM_ONCLOSE,_socket);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerIOData );
    GlobalFree( PerHandleData );

    PerIOData = NULL;
    PerHandleData = NULL;

    memory_sub();
    memory_sub();

    }

    }

    //发送消息完成
    else if(PerIOData->type == OPE_SEND)  
    {   
    ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));

    //CloseHandle(PerIOData->OVerlapped.hEvent);
    GlobalFree( PerIOData ); 

    PerIOData = NULL;
    BytesTransferred = 0;

    memory_sub();

    _post_msg(WM_ONSEND,_socket);
    }  

    }  
    return 0;
    }  


    //接受客户端连接进来的工作线程
    unsigned int __stdcall  accept_worker(LPVOID _socket) {

    COMPARAMS *_data = (COMPARAMS*)_socket;
    SOCKET m_sockListen = _data->Socket;
    HANDLE m_hIOCompletionPort = _data->IOCompletionPort;
    DWORD threadid = _data->threadid;

    SOCKET accept ;
    PER_HANDLE_DATA *PerHandleData = NULL;
    PER_IO_OPERATION_DATA *PerIOData = NULL;
    DWORD RecvBytes = 0;  
        DWORD Flags = 0; 


    while(true){

    accept = WSAAccept(m_sockListen,NULL,NULL,NULL,0);
    int error_code = WSAGetLastError() ;

    //10024 打开了过多的套接字
    if( INVALID_SOCKET == accept || 10024 == error_code || SOCKET_ERROR == accept ){
    _post_msg(WM_ONERROR,0,WING_ERROR_ACCEPT);
    continue;
    }

    _post_msg(WM_ONCONNECT,(unsigned long)accept);

    PerHandleData =  (PER_HANDLE_DATA*)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA));//new PER_HANDLE_DATA();

    if( NULL == PerHandleData ){
    _post_msg(WM_ONCLOSE,(unsigned long)accept);
    _close_socket(accept);
    continue;
    }

    memory_add();
     
    PerHandleData->Socket = accept;
    HANDLE iocp = CreateIoCompletionPort ((HANDLE )accept ,m_hIOCompletionPort , (ULONG_PTR)PerHandleData ,0);

    if( NULL == iocp){

    _post_msg(WM_ONCLOSE,(unsigned long)PerHandleData->Socket);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerHandleData );

    memory_sub();

    continue;
    }

    PerIOData =   (PER_IO_OPERATION_DATA*)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA));// new PER_IO_OPERATION_DATA();
    if ( NULL == PerIOData )
    {
    _post_msg(WM_ONCLOSE,(unsigned long)PerHandleData->Socket);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerHandleData );
    PerHandleData = NULL;

    memory_sub();
    continue;
    }
    memory_add();

    ZeroMemory(&(PerIOData->OVerlapped),sizeof(OVERLAPPED)); 
    ZeroMemory(PerIOData->Buffer,sizeof(PerIOData->Buffer));

    PerIOData->DATABuf.len = DATA_BUFSIZE; 
    PerIOData->DATABuf.buf = PerIOData->Buffer; 
    PerIOData->type = 1;
    Flags = 0; 

    //这个地方偶尔报异常 还没完全解决
    int code = WSARecv(accept,&(PerIOData->DATABuf),1,&RecvBytes,&Flags,&(PerIOData->OVerlapped),NULL);
    error_code =  WSAGetLastError();
    //调用一次 WSARecv 触发iocp事件
    if(0 != code && WSA_IO_PENDING != error_code){

    _post_msg(WM_ONCLOSE,(unsigned long)PerHandleData->Socket);
    _close_socket(PerHandleData->Socket);

    GlobalFree( PerHandleData );
    GlobalFree( PerIOData );

    PerHandleData = NULL;
    PerIOData = NULL;

    memory_sub();
    memory_sub();
    continue;
    }
    }
    return 0;
    }

    //iocp 服务主线程
    ZEND_FUNCTION(wing_service){

    zval *onreceive = NULL;
    zval *onconnect = NULL;
    zval *onclose = NULL;
    zval *onerror = NULL;
    zval *call_cycle=NULL;
    zval *_params = NULL;
    int port = 0;
    char *listen_ip = NULL;

    MAKE_STD_ZVAL(onreceive);
    MAKE_STD_ZVAL(onconnect);
    MAKE_STD_ZVAL(onclose);
    MAKE_STD_ZVAL(onerror);


    //参数获取
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z",&_params) != SUCCESS) {

    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }
    //如果参数不是数组
    if(Z_TYPE_P(_params) != IS_ARRAY){
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }


    HashTable *arr_hash = Z_ARRVAL_P(_params);
    int argc= zend_hash_num_elements(arr_hash);
    zval  **data = NULL;
    HashPosition pointer = NULL;


    //数组参数解析
    for(zend_hash_internal_pointer_reset_ex(arr_hash, &pointer); zend_hash_get_current_data_ex(arr_hash, (void**) &data, &pointer) == SUCCESS; zend_hash_move_forward_ex(arr_hash, &pointer)) {
            char *key;
            int key_len;
            long index;
            if (zend_hash_get_current_key_ex(arr_hash, &key, (uint*)&key_len, (ulong*)&index, 0, &pointer) == HASH_KEY_IS_STRING) {

    if(strcmp(key,"port")==0){
    port = Z_LVAL_PP(data);
    }else if(strcmp(key,"listen")==0){
    listen_ip = Z_STRVAL_PP(data);
    }else if(strcmp(key,"onreceive")==0){
    onreceive = *data;
    }else if(strcmp(key,"onconnect")==0){
    onconnect = *data;
    }else if(strcmp(key,"onclose")==0){
    onclose = *data;
    }else if(strcmp(key,"onerror")==0){
    onerror = *data;
    }
    //call_cycle
    else if(strcmp(key,"call_cycle")==0){
    call_cycle = *data;
    }

            } 
        } 

    //初始化消息队列
    message_queue= new queue_t();
    if( NULL == message_queue )exit(WING_BAD_ERROR);
        initQueue(message_queue);  

    //创建完成端口
    HANDLE m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 ); 
    if(m_hIOCompletionPort == NULL){
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }

    //初始化线程参数
    DWORD thread_id = GetCurrentThreadId();
    COMPARAMS *accept_params = new COMPARAMS();
    if( NULL == accept_params) exit(WING_BAD_ERROR);
    accept_params->threadid = thread_id;
    accept_params->IOCompletionPort = m_hIOCompletionPort;

    //根据cpu数量创建工作线程
    SYSTEM_INFO si; 
    GetSystemInfo(&si); 
    int m_nProcessors = si.dwNumberOfProcessors; 
    int m_nThreads = 2 * m_nProcessors; 
    for (int i = 0; i < m_nThreads; i++) 

    _beginthreadex(NULL, 0, socket_worker, accept_params, 0, NULL); 



    //初始化Socket
    WSADATA wsaData; 
    if(WSAStartup(MAKEWORD(2,2), &wsaData)!=0){
    delete accept_params;
    accept_params = NULL;
    //memory_sub();
    RETURN_LONG(WING_ERROR_FAILED);
    return; 
    }

    //WSACleanup( );
    //初始化Socket
    // 这里需要特别注意,如果要使用重叠I/O的话,这里必须要使用WSASocket来初始化Socket
    // 注意里面有个WSA_FLAG_OVERLAPPED参数
    SOCKET m_sockListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); 
    if(m_sockListen == INVALID_SOCKET){
    WSACleanup();
    delete accept_params;
    accept_params = NULL;
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }


    struct sockaddr_in ServerAddress; 
    // 填充地址结构信息
    ZeroMemory(&ServerAddress, sizeof(ServerAddress)); 
    ServerAddress.sin_family = AF_INET; 
    // 这里可以选择绑定任何一个可用的地址,或者是自己指定的一个IP地址
    //ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);                     
    ServerAddress.sin_addr.s_addr = inet_addr(listen_ip);          
    ServerAddress.sin_port = htons(port);   


    // 绑定端口
    if (SOCKET_ERROR == bind(m_sockListen, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress))){
    _close_socket(m_sockListen);
    WSACleanup();
    delete accept_params;
    accept_params = NULL;
    //memory_sub();
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }  

    // 开始监听
    if( 0 != listen(m_sockListen,SOMAXCONN)){
    _close_socket(m_sockListen);
    WSACleanup();
    delete accept_params;
    accept_params = NULL;
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }

    accept_params->Socket = m_sockListen;
    //客户端连接进来的处理线程
    _beginthreadex(NULL, 0, accept_worker, accept_params, 0, NULL);


    int times = 0;
    int nSize = 0;
    elemType *msg = NULL;//消息



    HANDLE handle = GetCurrentProcess();
    PROCESS_MEMORY_COUNTERS pmc;
        
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));
    unsigned int begin_size = pmc.WorkingSetSize;
        zend_printf("size:%d\r\n",pmc.WorkingSetSize);


    while( true )

    //判断是否有消息
    if(is_emptyQueue(message_queue)){
    Sleep(10);
    continue;
    }
    //_CrtMemCheckpoint( &s1 );
     
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));
    zend_printf("size-start:%d\r\n",pmc.WorkingSetSize-begin_size);


    outQueue(message_queue,&msg);

    if( NULL == msg ){

    Sleep(10);
    continue;

    }

    //根据消息ID进行不同的处理
    switch(msg->message_id){
    //新的连接
    case WM_ONCONNECT:
    {
    zval *params = NULL;
    zval *retval_ptr = NULL;

    MAKE_STD_ZVAL(params);
    ZVAL_LONG(params,(long)msg->wparam);//socket资源
    MAKE_STD_ZVAL(retval_ptr);
     
    if( SUCCESS != call_user_function(EG(function_table),NULL,onconnect,retval_ptr,1,&params TSRMLS_CC) ){
    zend_error(E_USER_WARNING,"onconnect call_user_function fail");
    }
    zval_ptr_dtor(&retval_ptr);
    zval_ptr_dtor(&params);
     
    }
    break;
    //收到消息
    case WM_ONRECV:
    {

    RECV_MSG *temp = (RECV_MSG*)msg->lparam;
    SOCKET client = (SOCKET)msg->wparam;

    zval *params[2] = {0};
    zval *retval_ptr = NULL;

    MAKE_STD_ZVAL(params[0]);
    MAKE_STD_ZVAL(params[1]);
    MAKE_STD_ZVAL(retval_ptr);

    ZVAL_LONG(params[0],(long)client);
    ZVAL_STRINGL(params[1],temp->msg,temp->len,1);


    if( SUCCESS != call_user_function(EG(function_table),NULL,onreceive,retval_ptr,2,params TSRMLS_CC) ){
    zend_error(E_USER_WARNING,"onreceive call_user_function fail");
    }

    zval_ptr_dtor(&retval_ptr);
    zval_ptr_dtor(&params[0]);
    zval_ptr_dtor(&params[1]);

    delete[] temp->msg;
    temp->msg = NULL;

    delete temp;
    temp = NULL;

    memory_sub();
    memory_sub();
    }
    break;
    //调用 _close_socket 服务端主动关闭socket
    case WM_ONCLOSE_EX:{

    _close_socket((SOCKET)msg->wparam);
    }break;

    //客户端掉线了
    case WM_ONCLOSE:
    {


    SOCKET client =(SOCKET)msg->wparam;

    zval *params = NULL;
    zval *retval_ptr = NULL;

    MAKE_STD_ZVAL(params);
    ZVAL_LONG(params,(long)client);
    MAKE_STD_ZVAL(retval_ptr);
     
    if(SUCCESS != call_user_function(EG(function_table),NULL,onclose,retval_ptr,1,&params TSRMLS_CC)){
    zend_error(E_USER_WARNING,"WM_ONCLOSE call_user_function fail\r\n");
    }
     
    zval_ptr_dtor(&retval_ptr);
    zval_ptr_dtor(&params);

    }
    break; 
    //发生错误 目前暂时也还没有用到
    case WM_ONERROR:{

    SOCKET client =(SOCKET)msg->wparam;

    zval *params = NULL;
    zval *retval_ptr = NULL;

    MAKE_STD_ZVAL(params);
    ZVAL_LONG(params,(long)msg->lparam);

    MAKE_STD_ZVAL(retval_ptr);
     
    if(SUCCESS != call_user_function(EG(function_table),NULL,onerror,retval_ptr,1,&params TSRMLS_CC)){
    zend_error(E_USER_WARNING,"onerror call_user_function fail");
    }
     
    zval_ptr_dtor(&retval_ptr);
    zval_ptr_dtor(&params);
    }
    break;
    //退出服务 暂时没有测试
    case WM_ONQUIT:
    {
    //zend_printf("quit\r\n");

    //PostQueuedCompletionStatus(m_hIOCompletionPort, 0xFFFFFFFF, 0, NULL);

    CloseHandle(m_hIOCompletionPort);
    _close_socket(m_sockListen);

    WSACleanup();
    clearQueue(message_queue);
    DeleteCriticalSection(&queue_lock);

    delete accept_params;
    delete msg;
    delete message_queue;

    accept_params = NULL;
    msg = NULL;
    message_queue = NULL;

    //zend_printf("quit end\r\n");
    RETURN_LONG(WING_SUCCESS);
    return;
    }break;

    }

    delete msg;
    msg = NULL;

    memory_sub();

    //显示内存申请 释放次数对比
    memory_times_show();
    GetProcessMemoryInfo(handle, &pmc, sizeof(pmc));
    zend_printf("size-last:%d\r\n",pmc.WorkingSetSize-begin_size);
      
        } 
    zend_printf("service quit\r\n");
    RETURN_LONG(WING_SUCCESS);
    }
    /***********************************
     * @停止服务
     ***********************************/
    ZEND_FUNCTION(wing_service_stop){
    _post_msg(WM_ONQUIT);
    RETURN_LONG(WING_SUCCESS);
    }

    /********************************************
     * @ 关闭socket
     * @ param socket
     ********************************************/
    ZEND_FUNCTION(wing_close_socket){

    zval *socket = NULL;
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z",&socket) != SUCCESS) {
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }
    convert_to_long(socket);

    _post_msg(WM_ONCLOSE_EX,Z_LVAL_P(socket));

    RETURN_LONG(WING_SUCCESS);
    }

    /*****************************************
     * @获取socket信息,ip 协议 端口 等
     * @return array
     ****************************************/
    ZEND_FUNCTION(wing_socket_info){

    zval *socket = NULL;
    MAKE_STD_ZVAL(socket);
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z",&socket) != SUCCESS) {
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }

    convert_to_long(socket);
    SOCKADDR_IN addr_conn;  
    int nSize = sizeof(addr_conn);  

    //memset((void *)&addr_conn,0,sizeof(addr_conn)); 
    ZeroMemory(&addr_conn,sizeof(addr_conn));
    getpeername((SOCKET)Z_LVAL_P(socket),(SOCKADDR *)&addr_conn,&nSize);  
      
    array_init(return_value);
    add_assoc_string(return_value,"sin_addr",inet_ntoa(addr_conn.sin_addr),1);
        add_assoc_long(return_value,"sin_family",addr_conn.sin_family);
    add_assoc_long(return_value,"sin_port",addr_conn.sin_port);
    add_assoc_string(return_value,"sin_zero",addr_conn.sin_zero,1);
    }
    /*****************************************
     * @ 发送socket数据
     * @ 同步发送接口 没有使用iocp
     ****************************************/
    ZEND_FUNCTION(wing_socket_send_msg)
    {  

    zval *socket;
    zval *msg;
    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zz",&socket,&msg) != SUCCESS) {
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }
    convert_to_long(socket);

    //此处没有使用完成端口 完成端口发送后 如果直接调用 socketclose 
    //关闭socket 有坑,处理不好会有内存泄漏
    if( SOCKET_ERROR == send((SOCKET)Z_LVAL_P(socket),Z_STRVAL_P(msg),Z_STRLEN_P(msg),0)){
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }

    RETURN_LONG(WING_SUCCESS);
    }  
    /***************************************************
     * @ 使用iocp异步发送消息
     ***************************************************/ 
    ZEND_FUNCTION(wing_socket_send_msg_ex){
    zval *socket = NULL;
    zval *msg = NULL;
    int close_after_send = 0;//发送完关闭socket 默认为false 否 待定 还没开发

    if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "zz|l",&socket,&msg,&close_after_send) != SUCCESS) {
    RETURN_LONG(WING_ERROR_PARAMETER_ERROR);
    return;
    }
    convert_to_long(socket);


    SOCKET sClient = (SOCKET)Z_LVAL_P(socket);

    char *pData = Z_STRVAL_P(msg);
    ulong Length = Z_STRLEN_P(msg);
    unsigned long  Flag = 0;  
    DWORD SendByte = 0;  

        if ( sClient == INVALID_SOCKET || pData == NULL || Length == 0 ){

    RETURN_LONG(WING_ERROR_FAILED);
    return;  
    }
       
    PER_IO_OPERATION_DATA  *PerIoData =  (PER_IO_OPERATION_DATA*)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA));//new PER_IO_OPERATION_DATA();
    memory_add();

    ZeroMemory(&(PerIoData->OVerlapped),sizeof(OVERLAPPED));      
    PerIoData->DATABuf.buf = pData; 
    PerIoData->DATABuf.len = Length; 
    PerIoData->type = OPE_SEND;

    int bRet  = WSASend(sClient,&(PerIoData->DATABuf),1,&SendByte,Flag,&(PerIoData->OVerlapped),NULL);  
    if( bRet != 0 &&  WSAGetLastError() != WSA_IO_PENDING ){

    GlobalFree( PerIoData );
    PerIoData = NULL;
    memory_sub();
    RETURN_LONG(WING_ERROR_FAILED);
    return;
    }
    RETURN_LONG(WING_SUCCESS);
    }

    2016年6月12日 0:31
  • #define WM_ONCONNECT WM_USER+60
    #define WM_ACCEPT_ERROR WM_USER+61
    #define WM_ONERROR WM_USER+62
    #define WM_ONCLOSE WM_USER+63
    #define WM_ONRECV WM_USER+64
    #define WM_ONQUIT           WM_USER+65
    #define WM_ONCLOSE_EX WM_USER+66
    #define WM_ONSEND WM_USER+67

    #define WM_TEST WM_USER+68
    #define WM_TEST2 WM_USER+69
    #define WM_TEST3 WM_USER+70

    #define WING_ERROR_CLOSE_SOCKET 4001
    #define WING_ERROR_ACCEPT 4002
    #define WING_ERROR_MALLOC 4003
    #define WING_BAD_ERROR -4

    char  *PHP_PATH = NULL;
    //timer 进程计数器 用于控制多个timer的创建和运行
    static int wing_timer_count = 0;
    //线程计数器 用于多线程控制
    static int wing_thread_count =0;


    #define WING_SUCCESS 1
    #define WING_CALLBACK_SUCCESS 0
    #define WING_ERROR_PARAMETER_ERROR -1
    #define WING_ERROR_FAILED -2
    #define WING_NOTICE_IGNORE -3
    #define WING_ERROR_CALLBACK_FAILED  -4
    #define WING_ERROR_PROCESS_NOT_EXISTS -5
    #define WING_ERROR_WINDOW_NOT_FOUND -6
    #define WING_PROCESS_IS_RUNNING 1


    //-------------socket相关参数-----------------------
    #define DATA_BUFSIZE 10240
    #define OPE_RECV 1
    #define OPE_SEND 2
    typedef struct  


      OVERLAPPED OVerlapped; 
      WSABUF DATABuf; 
      CHAR Buffer[DATA_BUFSIZE]; 
      int type;

    }PER_IO_OPERATION_DATA;


    typedef struct{ 
      SOCKET Socket;
    } PER_HANDLE_DATA; 

    typedef struct{ 
      SOCKET Socket;
      HANDLE IOCompletionPort;
      DWORD threadid;
    } COMPARAMS; 

    typedef struct{
    long len;
    char *msg;//[DATA_BUFSIZE+1]; 
    } RECV_MSG;
    //-------------socket相关参数------end-----------------


    2016年6月12日 0:40

  • //-------------内存计数器-----------------------
    unsigned long memory_add_times = 0;
    unsigned long memory_sub_times = 0;


    //CRITICAL_SECTION bytes_lock;
    void memory_add(){
        InterlockedIncrement(&memory_add_times);
    }
    void memory_sub(){
        InterlockedIncrement(&memory_sub_times);
    }
    //-------------内存计数器-----------------------



    //----消息队列----------------------------
    CRITICAL_SECTION queue_lock;
    typedef struct _thread_msg{
    int message_id;
    unsigned long lparam;
    unsigned long wparam;
    unsigned int size;

    } THREAD_MSG,elemType;

    typedef struct nodet   
    {  
        elemType *data;  
        struct nodet * next;  
    } node_t;            // 节点的结构  
      
    typedef struct queuet  
    {  
        node_t * head;  
        node_t * tail;  
    } queue_t;          // 队列的结构  

    queue_t *message_queue = NULL;

    void initQueue(queue_t * queue_eg)  
    {  
    if( NULL == queue_eg) return;
    InitializeCriticalSection(&queue_lock);
        queue_eg->head = NULL; //队头标志位  
        queue_eg->tail = NULL; //队尾标志位  
    }  
       
    int enQueue(queue_t *hq, elemType *x)  
    {  
    if( hq == NULL || NULL == x)
    return 0;

    EnterCriticalSection(&queue_lock);

        node_t * nnode = new node_t();

        if (nnode == NULL )  
        {  
    LeaveCriticalSection(&queue_lock);
            return 0;
        } 

        nnode->data = x;  
        nnode->next = NULL;  
        if (hq->head == NULL)  
        {  
            hq->head = nnode;  
            hq->tail = nnode;  
        } else {  
            hq->tail->next = nnode;  
            hq->tail = nnode;  
        }  

    memory_add();

    LeaveCriticalSection(&queue_lock);
        return 1;  
    }  
      
    void outQueue(queue_t * hq,elemType **temp)  
    {  
    if( NULL == hq) return;

    EnterCriticalSection(&queue_lock);
        node_t * p = NULL;  

        if (hq->head == NULL)  
        {  
    *temp = NULL;
    LeaveCriticalSection(&queue_lock);
    return;
        }  

        *temp = hq->head->data;  
        p = hq->head;  
        hq->head = hq->head->next;  
        if(hq->head == NULL)  
        {  
            hq->tail = NULL;  
        }  

        delete p;  
    p  = NULL;
    memory_sub();
    LeaveCriticalSection(&queue_lock);
    }  
      
    elemType *peekQueue(queue_t * hq)  
    {  
    if( NULL == hq ) return NULL;

        if (hq->head == NULL)  
        {  
            return NULL; 
        }   
        return hq->head->data;  
    }  
      
    int is_emptyQueue(queue_t * hq)  
    {  
    if( NULL == hq ) return 1;

        if (hq->head == NULL)  
        {  
            return 1;  
        } else {  
            return 0;  
        }  
    }  
        
    void clearQueue(queue_t * hq)  
    {  
    if( NULL == hq ) return;

        node_t * p = hq->head;  
        while(p != NULL)  
        {  
            hq->head = hq->head->next;  

            delete p; 
    p = NULL;
    memory_sub();
            p = hq->head;  
        }  
        hq->tail = NULL;  
        return;  
    }  
     
    //----消息队列----------end------------------

    2016年6月12日 0:40
  • 参考以下工具试下

    https://vld.codeplex.com/

    Visual Leak Detector

    http://www.codeproject.com/Articles/3134/Memory-Leak-and-Exception-Trace-CRT-and-COM-Leaks


    专注于.NET ERP/CRM开发框架,C/S架构,SQL Server + ORM(LLBL Gen Pro) + Infragistics WinForms

    2016年6月12日 1:55
  • 从调试信息大概能看出来,关闭socket和收到消息内存释放不正常~

    size:436682752
    start memory:436936704
    onconnect
    new times : 12 , free times : 2 , need more free : 10
    free memory:-8192
    add memory:245760

    --------------------------------------------------------------------------------

    start memory:436928512
    onconnect
    new times : 12 , free times : 4 , need more free : 8
    free memory:-8192
    add memory:237568

    --------------------------------------------------------------------------------

    start memory:436920320
    onconnect
    new times : 12 , free times : 6 , need more free : 6
    free memory:-8192
    add memory:229376

    --------------------------------------------------------------------------------

    start memory:436940800
    onrecv
    new times : 18 , free times : 10 , need more free : 8
    free memory:3305472
    add memory:3563520

    --------------------------------------------------------------------------------

    start memory:440246272
    oncloseex
    new times : 19 , free times : 12 , need more free : 7
    free memory:16384
    add memory:3579904

    --------------------------------------------------------------------------------

    start memory:440250368
    onclose
    new times : 20 , free times : 16 , need more free : 4
    free memory:-12288
    add memory:3555328

    --------------------------------------------------------------------------------

    start memory:440254464
    onrecv
    new times : 26 , free times : 20 , need more free : 6
    free memory:12288
    add memory:3584000

    --------------------------------------------------------------------------------

    start memory:440266752
    oncloseex
    new times : 27 , free times : 22 , need more free : 5
    free memory:0
    add memory:3584000

    --------------------------------------------------------------------------------

    start memory:440266752
    onclose
    new times : 28 , free times : 26 , need more free : 2
    free memory:-24576
    add memory:3559424

    --------------------------------------------------------------------------------

    start memory:440258560
    onrecv
    new times : 34 , free times : 30 , need more free : 4
    free memory:-4096
    add memory:3571712

    --------------------------------------------------------------------------------

    start memory:440254464
    oncloseex
    new times : 35 , free times : 32 , need more free : 3
    free memory:0
    add memory:3571712

    --------------------------------------------------------------------------------

    start memory:440254464
    onclose
    new times : 36 , free times : 36 , need more free : 0
    free memory:-24576
    add memory:3547136

    --------------------------------------------------------------------------------


    2016年6月12日 4:36
  • vld可以调试守护进程么?

    以上程序是一个php扩展~

    不知道是否支持,回头尝试一下,谢谢您的支持

    2016年6月12日 4:39
  • vld好像不是很适用于这种php扩展dll的场合~~
    2016年6月12日 13:05
  • Hi 吉利儿

    感谢在MSDN论坛发帖。

    对于你的情况,我建议使用MemTrack或者BoundsChecker进行内存检查。可以更准确的找到问题。希望对你有所帮助。

    Best Regards,

    Sera Yu


    We are trying to better understand customer views on social support experience, so your participation in this interview project would be greatly appreciated if you have time. Thanks for helping make community forums a great place.

    2016年6月13日 6:14