博文

目前显示的是 十一月, 2016的博文

Windows上如何在进程间传递socket句柄

问题的背景是这样的:我需要写一个程序A,来launch另一个程序B。B是一个server程序,它需要监听一个TCP端口。这个端口是A来分配的,A需要确保这个端口没有被占用。

我最初的办法是这样:写一个函数,遍历现在的tcp table,来判断哪些端口已经在使用中。

std::vector<int> getFreePorts(int begin, int end) { char errbuf[256]; if (end <= begin) { THROW_EXCEPTION("illegal argument"); } std::unique_ptr<MIB_TCPTABLE2, decltype(std::free)*> pTcpTable(nullptr, std::free); ULONG ulSize = sizeof(MIB_TCPTABLE); for (int iter = 0; iter != 2; ++iter) { pTcpTable.reset((MIB_TCPTABLE2*)malloc(ulSize)); ULONG dwRetVal = GetTcpTable2(pTcpTable.get(), &ulSize, FALSE); if (dwRetVal == NO_ERROR) break; if (dwRetVal != ERROR_INSUFFICIENT_BUFFER) { THROW_EXCEPTION("GetTcpTable2 failed,error = %ul", dwRetVal); } } std::vector<uint8_t> avail(end - begin, 1); for (int i = 0; i != pTcpTable->dwNumEntries; ++i) { auto& table = pTcpTable->table[i]; int off = table.dwL…

进程内的生产者/消费者队列,一点思考和总结

考虑这样一个实际场景:
有一个文本文件,需要根据每一行的内容计算出一个score来。将score输出到一个新的文件。假设计算score是很耗费cpu的,请尽可能用多线程并行化的方式来加速。

这时我们会想到map-reduce/work-stealing/bounded queue等等。

拿bounded queue来说,最基本的版本是一个mutex两个条件变量,高级的版本是两个mutex两个条件变量。

接下来要解决的问题是:当生产者读到文件末尾时,如何把这件事情告诉消费者? 假设只有一个生产者,有多个消费者。一种简单的做法是:如果消费者的数量固定,已知,假设有n个,那么就往队列里扔n个特殊类型的消息以标记EOF。当消费者读到EOF则退出。这个思路也可扩展到多个生产者、多个消费者的情况。这是一种纯message-passing的方案。

但是如何处理突发出错?假设生产者读文件的时候突然读到一条错误的记录,它想要立刻通知所有的消费者结束,该怎么办?如果我们的队列是支持优先级的,那么把control message设置成最高优先级,这个问题也可以解决。但如果是反过来,消费者报错,希望所有的生产者和消费者都立刻终止,该怎么办?如果我们有一个双向的queue,这个问题也可以解决。但是往往最简单的方案还是加入一个coordinator,让它具有一个isHealthy()这样的方法。此时,所有条件变量的wait方法都需要改成timed wait。

再看另一个与此很相关的问题:假设我们有n个task,要交给n个线程同时执行。然后主线程需要等待并搜集这n个结果。这个看似很简单,可以用count down latch实现一个栅栏,也可以创建n个feature 然后挨个wait。但是假如要考虑出错的情况,即,子线程在执行的时候可能会遇错提前退出,此时希望其它的线程也尽快停止。

这时候最简单的办法是:让count down latch有一个countDownToZero这样的方法。这样主线程就收到错误直接停止等待了。还有一种办法就是把task result及task id都放入一个unbounded queue中,然后主线程一直wait在这个queue上。

但无论如何,为了能安全且及时的通知其它线程退出,无论什么样的场景,代码中不能有无限时间的wait。所有的wait必须得带有一个时间间隔…