进程内的生产者/消费者队列,一点思考和总结

考虑这样一个实际场景:
有一个文本文件,需要根据每一行的内容计算出一个score来。将score输出到一个新的文件。假设计算score是很耗费cpu的,请尽可能用多线程并行化的方式来加速。

这时我们会想到map-reduce/work-stealing/bounded queue等等。

拿bounded queue来说,最基本的版本是一个mutex两个条件变量,高级的版本是两个mutex两个条件变量。

接下来要解决的问题是:当生产者读到文件末尾时,如何把这件事情告诉消费者? 假设只有一个生产者,有多个消费者。一种简单的做法是:如果消费者的数量固定,已知,假设有n个,那么就往队列里扔n个特殊类型的消息以标记EOF。当消费者读到EOF则退出。这个思路也可扩展到多个生产者、多个消费者的情况。这是一种纯message-passing的方案。

但是如何处理突发出错?假设生产者读文件的时候突然读到一条错误的记录,它想要立刻通知所有的消费者结束,该怎么办?如果我们的队列是支持优先级的,那么把control message设置成最高优先级,这个问题也可以解决。但如果是反过来,消费者报错,希望所有的生产者和消费者都立刻终止,该怎么办?如果我们有一个双向的queue,这个问题也可以解决。但是往往最简单的方案还是加入一个coordinator,让它具有一个isHealthy()这样的方法。此时,所有条件变量的wait方法都需要改成timed wait。

再看另一个与此很相关的问题:假设我们有n个task,要交给n个线程同时执行。然后主线程需要等待并搜集这n个结果。这个看似很简单,可以用count down latch实现一个栅栏,也可以创建n个feature 然后挨个wait。但是假如要考虑出错的情况,即,子线程在执行的时候可能会遇错提前退出,此时希望其它的线程也尽快停止。

这时候最简单的办法是:让count down latch有一个countDownToZero这样的方法。这样主线程就收到错误直接停止等待了。还有一种办法就是把task result及task id都放入一个unbounded queue中,然后主线程一直wait在这个queue上。

但无论如何,为了能安全且及时的通知其它线程退出,无论什么样的场景,代码中不能有无限时间的wait。所有的wait必须得带有一个时间间隔。相比而下java就简单些,因为java可以interrupt线程。C/C++中虽然有pthread_cancel这样的方法,但是终归还是太危险。

事情还没完,假如IO是异步的,这问题又复杂许多。举个例子,假设读文件是先发起一个http请求,然后等待callback处理IO完成或异常。那么怎么处理出错呢?我们可以在每个callback的开始加上isHealthy(),此外,还必须知道有没有callback正在执行。如果有,主线程就得继续等。如果没有,必须保证不会有新的callback被触发,然后才能退出。

此博客中的热门博文

少写代码,多读别人写的代码

在windows下使用llvm+clang

tensorflow distributed runtime初窥