异步执行task的时候,一定要把完成通知放在最末尾的地方

请看下面这段示例代码。这段程序的目的是计算1*2+3*4+5*6=? 。它试图并行的计算3个乘法,然后汇总。如果乘号的两边是大矩阵或者向量,这样的多线程计算会得到很好的加速。
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <pthread.h>
#include <stdio.h>
#include <thread>
#include <unistd.h>

#define DISALLOW_COPY_AND_ASSIGN(TypeName)                                     \
  TypeName(const TypeName &) = delete;                                         \
  void operator=(const TypeName &) = delete

class SubTask;

void *runSubTask(void *);
class Job {
public:
  Job() {}

  void compute();

  void onSubTaskFinished() {
    if (--active_tasks == 0) {
      cv_.notify_all();
    }
  }
  std::atomic<size_t> active_tasks;

private:
  std::mutex m_;
  std::condition_variable cv_;
  DISALLOW_COPY_AND_ASSIGN(Job);
};

class SubTask {
public:
  int v1;
  int v2;
  std::atomic<int> *result;
  Job *parent;
  int task_id;

  ~SubTask() { printf("Sub task %d finished\n", task_id); } //BUG!!

  void compute() const { *result += v1 * v2; }

private:
  DISALLOW_COPY_AND_ASSIGN(SubTask);
};

void *runSubTask(void *param) {
  std::unique_ptr<SubTask> task((SubTask *)param);
  task->compute();
  task->parent->onSubTaskFinished();
  return nullptr;
}

void Job::compute() {
  int task_count = 3;
  active_tasks = task_count;
  std::atomic<int> result(0);
  for (int i = 0; i != task_count; ++i) {
    pthread_t tid;
    pthread_attr_t attr; 
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    pthread_create(&tid, &attr, runSubTask,
                   new SubTask{2 * i + 1, 2 * i + 2, &result, this, i});
  }
  std::unique_lock<std::mutex> lk(m_);
  cv_.wait(lk, [this]() { return active_tasks == 0; });
  printf("%d\n", result.load());
}

int main() {
  Job j;
  j.compute();
  return 0;
}
上面的代码总体来说没有太大的问题。但是,SubTask的析构函数里面的那行printf有时候会打印不出来。因为有可能的执行步骤是这样:
1. 主线程创建子线程
2. 子线程执行计算,并通过Job::onSubTaskFinished() 通知主线程计算完成
3. 主线程收到通知后打印结果并退出
4. 子线程清理栈上的变量,执行它们的析构函数

更有甚者,如果尝试在SubTask的析构函数中访问Job对象,那么就有可能造成程序崩溃。这种问题在使用线程池执行后台任务时经常会发生,因为对于class job来说,它没有thread.join()这样的方法可以调用。

解决办法就是小心小心再小心。我觉得最好呢,在runSubTask这个函数中就严格禁止在这个函数的栈上使用C++的临时变量,并且
1. 确保完成通知在最后执行
2. 确保完成通知一定会执行。否则主任务可能陷入无限等待,而且这种情况从thread dump很难看出来是卡死在哪了。

评论

此博客中的热门博文

想换个新路由器

这几天玩快手玩的入迷

用java生tensorflow的tfrecord文件