none
VC++ 2008 Express - 多线程CRITICAL_SECTION同步失败 RRS feed

  • 问题

  • 有两个线程:
    线程1连续读取文件的一部分,将读取的部分放入链表中,release semaphore,线程2等待semaphore,读取链表。但是线程2偶尔报链表是空。
    相应代码如下:
    链表:
    struct Node {
     unsigned char buf[ITEM_SIZE + 1];
     unsigned int length;
     struct Node *next;
    };
    struct List {
     struct Node *head;
     struct Node *tail;
     CRITICAL_SECTION critical_sec;
    };

    //semaphore
    HANDLE hWorkerSem[SIZE_OF_CALLBACK];
    //线程1将读取的部分放入该链表中,线程2也读取该链表
    struct List *Alist[SIZE_OF_CALLBACK];
    //为了提高效率,建立了Node池,线程1从该线程提取一个空闲的Node, 将读取文件的一部分放入Node中,然后放入Alist中
    //线程2从Alist读取一个Node, 处理完后放入Flist
    struct List *Flist[SIZE_OF_CALLBACK];

    //下面是对Alist和Flist进行操作的函数
    void addAvailableNode(int type, struct Node *node)
    {
     EnterCriticalSection(&Alist[type]->critical_sec);
     if (node != NULL)
     {
      if (Alist[type]->head == NULL)
      {
       Alist[type]->head = node;
       Alist[type]->tail = node;
      }
      else
      {
       Alist[type]->tail->next = node;
       Alist[type]->tail = node;
      }
      //printf("thread %d addAvailableNode() type %d node %p\n",GetCurrentThreadId(),  type, node);
      LeaveCriticalSection(&Alist[type]->critical_sec);
      ReleaseSemaphore(hWorkerSem[type], 1, NULL);
      return;
     }
     printf("thread %d addAvailableNode() NULL node pointer: type %d\n", GetCurrentThreadId(), type);
     LeaveCriticalSection(&Alist[type]->critical_sec);
    }

    void getAvailableNode(int type, struct Node **node)
    {
     EnterCriticalSection(&(Alist[type]->critical_sec));
     *node = Alist[type]->head;
     if (Alist[type]->head == NULL)
     {
      printf("thread %d Error, no available node, index %d!\n", GetCurrentThreadId(), type);
     }
     else
     {
      Alist[type]->head = Alist[type]->head->next;
      //printf("thread %d getAvailableNode() type %d node %p\n", GetCurrentThreadId(), type, *node);
     }
     LeaveCriticalSection(&(Alist[type]->critical_sec));
    }

    void getFreeNode(int type, struct Node **node)
    {
     EnterCriticalSection(&Flist[type]->critical_sec);
     *node = Flist[type]->head;
     if (Flist[type]->head == NULL)
     {
      *node = (struct Node *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(struct Node));
      if (*node == NULL)
      {
       //really bad, no memory?
       printf("\n\nFatal error, malloc for free node failed!\n\n");
       exit(-1);
      }
     }
     else
     {
      Flist[type]->head = Flist[type]->head->next;
     }
     memset(*node, 0, sizeof(struct Node));
     //printf("thread %d getFreeNode() type %d node %p\n", GetCurrentThreadId(), type, *node);
     LeaveCriticalSection(&Flist[type]->critical_sec);
    }

    void addFreeNode(int type, struct Node *node)
    {
     EnterCriticalSection(&Flist[type]->critical_sec);
     if (node != NULL)
     {
      if (Flist[type]->head == NULL)
      {
       Flist[type]->head = node;
       Flist[type]->tail = node;
      }
      else
      {
       Flist[type]->tail->next = node;
       Flist[type]->tail = node;
      }
     }
     //printf("thread %d addFreeNode() type %d node %p\n", GetCurrentThreadId(), type, node);
     LeaveCriticalSection(&Flist[type]->critical_sec);
    }

    下面是线程的操作:
    线程1:
    getFreeNode(type, &node); //type 值是线程启动时传入的。
     //read in the whole record
     rt = fread(node->buf, sizeof(unsigned char), actLen, file);
     if(rt != actLen)
     {
      printf("Error reading %d length of file - rt %d\n", actLen, rt);
      addFreeNode(type, node);
      return -1;
     }
     node->length = actLen;
     addAvailableNode(type, node);

    线程2:
    unsigned __stdcall threadStartMOC(void *args)
    {
     HANDLE sem = (HANDLE)(args); //sem是hWorkerSem[SIZE_OF_CALLBACK]中的一个,线程启动时传入。

     while(1)
     {
      WaitForSingleObject(sem, INFINITE);
      
      struct Node *node = NULL;
      getAvailableNode(T_MOC, &node);
      if (node == NULL)
      {
       continue;
      }

      //process file content
      porcessREC(node);

      addFreeNode(T_REC, node);
     }
     _endthreadex( 0 );
     return 0;
    }

    运行时偶尔报错:
    thread 3020 Error, no available node, index 5!
    thread 3052 Error, no available node, index 4!
    thread 2300 Error, no available node, index 5!
    thread 2200 Error, no available node, index 5!
    thread 2512 Error, no available node, index 5!
    thread 1704 Error, no available node, index 5!
    thread 3020 Error, no available node, index 5!
    thread 2300 Error, no available node, index 5!
    thread 1704 Error, no available node, index 5!
    thread 3020 Error, no available node, index 5!
    thread 1704 Error, no available node, index 5!
    thread 3488 Error, no available node, index 4!
    thread 2356 Error, no available node, index 4!
    thread 3052 Error, no available node, index 4!
    thread 3052 Error, no available node, index 4!
    thread 2356 Error, no available node, index 4!
    thread 3488 Error, no available node, index 4!
    thread 3476 Error, no available node, index 4!
    thread 2356 Error, no available node, index 4!
    thread 3488 Error, no available node, index 4!

    不在错在哪里?
    我的CPU是Intel双核的E5200.
    谢谢帮助!

    2010年1月26日 14:09

答案

  • 感谢SplendourG的回复!

    我相信我解决了这个问题,以下是改后的代码,供感兴趣的朋友参考。

    //生产者线程
    EnterCriticalSection(&Flist[type]->critical_sec);
     struct Node* node = getFreeNode(type);
     LeaveCriticalSection(&Flist[type]->critical_sec);

     //read in the whole record
     rt = fread(node->buf, sizeof(unsigned char), actLen, file);
     if(rt != actLen)
     {
      printf("Error reading %d length of file - rt %d\n", actLen, rt);
      addFreeNode(type, node);
      return -1;
     }
     node->length = actLen;
     addAvailableNode(type, node);

    //消费者线程
    DWORD dw = WaitForSingleObject(sem, INFINITE);
      switch(dw) {
       case WAIT_OBJECT_0:
        break;
       default:
        printf("WaitForSingleObject ERROR %d\n", GetLastError());
        continue;
      }

      EnterCriticalSection(&(Alist[type]->critical_sec));
      struct Node *node = getAvailableNode(type);
      LeaveCriticalSection(&(Alist[type]->critical_sec));
      if (node == NULL)
      {
       continue;
      }
      populateNode(node);
      addFreeNode(type, node);


    //Node处理函数
    void addAvailableNode(volatile int type, struct Node *node)
    {
     if (node == NULL)
      return;

     EnterCriticalSection(&Alist[type]->critical_sec);
     if (Alist[type]->head == NULL)
     {
      Alist[type]->head = node;
      Alist[type]->tail = node;
     }
     else
     {  
      Alist[type]->tail->next = node;
      Alist[type]->tail = node;
     }

     //printf("addAvailableNode() type %d node %p\n", type, node);
     ReleaseSemaphore(hWorkerSem[type], 1, NULL);
     LeaveCriticalSection(&Alist[type]->critical_sec);
     return;
    }

    struct Node* getAvailableNode(volatile int type)
    {
     struct Node *node = NULL;
     if (Alist[type]->head == NULL)
     {
      printf("thread %d Error, no available node, index %dn", GetCurrentThreadId(), type);
     }
     else
     {
      node = Alist[type]->head;  
      Alist[type]->head = Alist[type]->head->next;
      node->next = NULL;
      //printf("getAvailableNode() type %d node %p\n", type, node);
     }
     return node;
    }

    void addFreeNode(volatile int type, struct Node *node)
    {
     if (node == NULL)
      return;

     EnterCriticalSection(&Flist[type]->critical_sec);
     if (Flist[type]->head == NULL)
     {
      Flist[type]->head = node;
      Flist[type]->tail = node;
     }
     else
     {
      Flist[type]->tail->next = node;
      Flist[type]->tail = node;
     }
     //printf(" addFreeNode() type %d node %p\n", type, node);
     LeaveCriticalSection(&Flist[type]->critical_sec);
    }


    struct Node* getFreeNode(volatile int type)
    {
     struct Node *node = NULL;
     if (Flist[type]->head == NULL)
     {
      node = (struct Node *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(struct Node));
      if (node == NULL)
      {
       //really bad, no memory?
       printf("\n\nFatal error, malloc for free node failed!\n\n");
       exit(-1);
      }
     }
     else
     {
      node = Flist[type]->head;
      Flist[type]->head = Flist[type]->head->next;
      node->next = NULL;
      node->length = 0;
      node->buf[0] = '\0';
     }
     
     //printf("getFreeNode() type %d node %p\n", type, node);
     return node;
    }

    快乐周末!

    • 已标记为答案 王新鲁 2010年1月29日 15:03
    2010年1月29日 15:01

全部回复

  • 首先保证每个消费者线程的sem 都是正确的。并且是否有每种type有多个消费者的情况?
    试一下在addAvailableNode中
    ReleaseSemaphore(hWorkerSem[type], 1, NULL);
    前添加代码:
    static LONG lValue = 0;
    InterlockedIncrement(&lValue );
    lValue = 0;


    麻烦把正确答案设为解答。
    2010年1月27日 6:48
    版主
  • 谢谢SplendourG的回复,每个消费者线程的sem 应该是正确。在我的例子里是每种type有多个消费者的情况,我也试过每种type有一个消费者的情况,仍然偶尔出错(伴有内存访问错误)。
    我添加了InterlockedIncrement()代码,但仍然报错。
    谢谢。
    2010年1月27日 13:00
  • CRITICAL_SECTION critical_sec InitializeCriticalSection()了?


    麻烦把正确答案设为解答。
    2010年1月28日 1:46
    版主
  • 是的, 线程启动前已经初始化。

    2010年1月28日 13:24
  • 原来是没有初始化也。
    2010年1月29日 1:52
  • 不好意思,我是说critical_sec InitializeCriticalSection()在线程启动前已经调用过了。
    2010年1月29日 4:13
  • SplendourG,谢谢回复。我将情况进一步说明下:
    生产者只有一个线程,它读取文件,根据读取内容的类型(即消费者的type)将读取的内容放入相应type的Alist中。
    消费者有多个,每个对应一种type,且对应自己的hWorkerSem。
    我试图打印每个Node的指针(就是在每个函数里被//掉的那些printf语句),但是这样问题就不见了。
    对照没问题的和出错的结果发现,出错的结果中少了一些文件内容,应该是出错时没读到的那些。
    我想应该是指针有问题,单单从代码角度看好像逻辑是对的,我考虑是否是因为编译器优化出的问题,多线程下应该对某些变量加上volatile。只是我的初步想法,还没试验。
    谢谢。
    2010年1月29日 4:24
  • 谢谢SplendourG的回复,每个消费者线程的sem 应该是正确。在我的例子里是每种type有多个消费者的情况,我也试过每种type有一个消费者的情况,仍然偶尔出错(伴有内存访问错误)。
    我添加了InterlockedIncrement()代码,但仍然报错。
    谢谢。

    伴有内存访问错误内存访问错误是指向哪里错误。
    麻烦把正确答案设为解答。
    2010年1月29日 6:14
    版主
  • 感谢SplendourG的回复!

    我相信我解决了这个问题,以下是改后的代码,供感兴趣的朋友参考。

    //生产者线程
    EnterCriticalSection(&Flist[type]->critical_sec);
     struct Node* node = getFreeNode(type);
     LeaveCriticalSection(&Flist[type]->critical_sec);

     //read in the whole record
     rt = fread(node->buf, sizeof(unsigned char), actLen, file);
     if(rt != actLen)
     {
      printf("Error reading %d length of file - rt %d\n", actLen, rt);
      addFreeNode(type, node);
      return -1;
     }
     node->length = actLen;
     addAvailableNode(type, node);

    //消费者线程
    DWORD dw = WaitForSingleObject(sem, INFINITE);
      switch(dw) {
       case WAIT_OBJECT_0:
        break;
       default:
        printf("WaitForSingleObject ERROR %d\n", GetLastError());
        continue;
      }

      EnterCriticalSection(&(Alist[type]->critical_sec));
      struct Node *node = getAvailableNode(type);
      LeaveCriticalSection(&(Alist[type]->critical_sec));
      if (node == NULL)
      {
       continue;
      }
      populateNode(node);
      addFreeNode(type, node);


    //Node处理函数
    void addAvailableNode(volatile int type, struct Node *node)
    {
     if (node == NULL)
      return;

     EnterCriticalSection(&Alist[type]->critical_sec);
     if (Alist[type]->head == NULL)
     {
      Alist[type]->head = node;
      Alist[type]->tail = node;
     }
     else
     {  
      Alist[type]->tail->next = node;
      Alist[type]->tail = node;
     }

     //printf("addAvailableNode() type %d node %p\n", type, node);
     ReleaseSemaphore(hWorkerSem[type], 1, NULL);
     LeaveCriticalSection(&Alist[type]->critical_sec);
     return;
    }

    struct Node* getAvailableNode(volatile int type)
    {
     struct Node *node = NULL;
     if (Alist[type]->head == NULL)
     {
      printf("thread %d Error, no available node, index %dn", GetCurrentThreadId(), type);
     }
     else
     {
      node = Alist[type]->head;  
      Alist[type]->head = Alist[type]->head->next;
      node->next = NULL;
      //printf("getAvailableNode() type %d node %p\n", type, node);
     }
     return node;
    }

    void addFreeNode(volatile int type, struct Node *node)
    {
     if (node == NULL)
      return;

     EnterCriticalSection(&Flist[type]->critical_sec);
     if (Flist[type]->head == NULL)
     {
      Flist[type]->head = node;
      Flist[type]->tail = node;
     }
     else
     {
      Flist[type]->tail->next = node;
      Flist[type]->tail = node;
     }
     //printf(" addFreeNode() type %d node %p\n", type, node);
     LeaveCriticalSection(&Flist[type]->critical_sec);
    }


    struct Node* getFreeNode(volatile int type)
    {
     struct Node *node = NULL;
     if (Flist[type]->head == NULL)
     {
      node = (struct Node *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(struct Node));
      if (node == NULL)
      {
       //really bad, no memory?
       printf("\n\nFatal error, malloc for free node failed!\n\n");
       exit(-1);
      }
     }
     else
     {
      node = Flist[type]->head;
      Flist[type]->head = Flist[type]->head->next;
      node->next = NULL;
      node->length = 0;
      node->buf[0] = '\0';
     }
     
     //printf("getFreeNode() type %d node %p\n", type, node);
     return node;
    }

    快乐周末!

    • 已标记为答案 王新鲁 2010年1月29日 15:03
    2010年1月29日 15:01