2017年1月19日星期四

2016我学到了些什么

转眼又是一年过去了,回头看看自己也没做太多事情,尤其是博客基本没写,所以也不记得自己都遇到了些什么问题,都怎么解决的。以后还是要勤快点。哪怕质量差,哪怕错误很多,也比全忘了。

这1年写C++写的很爽。惭愧的是毕业以后我没太多的写过C++。去年一直在心无旁骛的写代码,平均每周开会只有1个小时,没有daily stand up meeting,没有越洋电话。我现在写代码越来越在意debuggability。比如抛出一个异常的时候一定要记录是在哪一行抛的。一个分布式程序,遇到错误的时候如何快捷的找到是哪台机器哪个进程才是错误的根源。我总认为花在debug上的功夫都是无用功,是可以省下来早点下班回家打游戏陪老婆的。心中窃喜之前刷面试题练一次写对无bug还是对工作蛮有用的。

我对Windows异步IO以及IOCP有了更深的了解。此处强烈赞一下Windows的新thread pool API,设计的非常棒!它把IO、用户的task、timer全都整合到了一起,尤其是在cancel请求、shutdown threadpool方面做的很干净。而老的程序,比如使用windows http server API写的程序,只需要稍改几行代码就可以迁移过来。不过总的来说IOCP是个很大的坑,我要学的东西很多。拿着别人的代码我经常看不明白他为什么这么做。比如有个大神的代码,他在收到每个IOCP的事件之后,不是直接处理,而是转成一个内部事件Post给IOCP线程池。他这么做必有他的用意吧。

另外把torch7的核心代码看了一遍,基本上是在上下班的地铁上完成的。我觉得对于一个分布式trainer来说,train一个逻辑回归和train一个经典的Feedforward neural network并没有太大区别。90%以上的代码都是一样的。你都需要有map-reduce或者parameter server,这才是最耗工程量的。backpropagation固然重要,但是torch和tensorflow竟然都是在脚本层实现backpropagation。Auto differentiation是一件听上去很神奇但是实现起来毫无新颖的东西。就跟数据库事务能回滚一样。tensorflow的代码我只看了一部分,总体来说真是简单明了,结构上特别清晰,干净。实物不如paper那么让人有想象力让人惊叹。

嗯,还有就是意外收到了google的foobar的邀请,然后把题全做完了。基本上每个题都能让我学到一些新东西,印象最深的就是马尔科夫链。很多图算法以前只是看书,从来没写过,这次也写了一遍。一次写对无需debug还是挺happy的。失望的是做完并没有什么卵用,只是被google的hr邀请去面试而已,可我已经被邀请数次了丝毫不感兴趣。你们倒是给我发件T-shirt啊!上次那件唐僧师徒的Android T-shirt 我甚是喜欢。

机器学习方面算是入了NLP的坑,主要是看论文,对传统的神经网络语言模型有了大致了解,接下来还需要实践的用一用。word2vec/fastText的代码看了些许,希望今年把fastText读完。另外就是把gbdt、lambda rank之类的相关论文和代码读了读,对google的Sibyl以及他们的GBM的实现特别感兴趣,很想自己用同样的方法试试。对于他们能纯用map-reduce来实现这些我真的很佩服,尤其是用那些tricky的办法来降低map-reduce的调度延迟。

Markdown让我伤透了心,再也不想碰了。辛辛苦苦的东西总是在render的时候给我意外,过了很久之后我才发现很多地方不对,气得我直接把博客的数据库回档到1年前了。

2017年1月4日星期三

关于TCP可靠性的一点思考,借此浅谈应用层协议设计

本文主要讨论如何设计一个可靠的RPC协议。TCP是可靠的传输协议,不会丢包,不会乱序,这是课本上讲述了无数遍的道理。基于TCP的传输理论上来说都是可靠的,但是实际这也得看场景。当我做网络游戏的时候也是一直把它当一个可靠的传输协议来用,从没考虑过TCP丢包的问题。直到当我面临像网络存储、机器学习这样领域时,我发现TCP变得“不可靠”了。

具体来说:
  1. 发送方能不能知道已发送的数据对方是不是都收到了?或者,收到了多少?答:不能
  2. 如果怀疑对方没收到,有没有办法可以确认对方没有收到? 答:不能
  3. 我想发送的是“123”,对方收到的会不会是“1223”? 答:是的,会这样,而且无法避免。
第一个问题看起来很傻,众所周知TCP有ACK啊,ACK就是用来对方通知接收到了多少个字节的。可是,实际情况是,ACK是操作系统的事儿,它收到ACK后并不会通知用户态的程序。发送的流程是这样的:
  1. 应用程序把待发送的数据交给操作系统
  2. 操作系统把数据接收到自己的buffer里,接收完成后通知应用程序发送完成
  3. 操作系统进行实际的发送操作
  4. 操作系统收到对方的ACK
问题来了,假如在执行完第二步之后,网络出现了暂时性故障,TCP连接断了,你该怎么办?如果是网络游戏,这很简单,把用户踢下线,让他重新登录去,活该他网不好。但是如果比较严肃的场合,你当然希望能支持TCP重连。那么问题就来了,应用程序并不知道哪些数据发丢了。

以Windows I/O completion ports举个例子。一般的网络库实现是这样的:在调用WSASend之前,malloc一个WSABuffer,把待发送数据填进去。等到收到操作系统的发送成功的通知后,把buffer释放掉(或者转给下一个Send用)。在这样的设计下,就意味着一旦遇上网络故障,丢失的数据就再也找不回来了。你可以reconnect,但是你没办法resend,因为buffer已经被释放掉了。所以这种管理buffer的方式是一个很失败的设计,释放buffer应当是在收到response之后。

Solution:不要依赖于操作系统的发送成功通知,也不要依赖于TCP的ACK,如果你希望保证对方能收到,那就在应用层设计一个答复消息。再或者说,one-way RPC都是不可靠的,无论传输层是TCP还是UDP,都有可能会丢。

第二个问题,是设计应用层协议的人很需要考虑的,简单来说,“成功一定是成功但失败不一定是失败”。我想举个例子。假如你现在正在通过网银给房东转账交房租,然后网银客户端说:“网络超时,转账操作可能失败”。你敢重新再转一次吗?我打赌你不敢。

再举个例子,假设你设计了一个分布式文件存储服务。这个服务只有一条“Append”协议:
  1. 客户端向服务器发送文件名和二进制data。
  2. 服务器把文件打开(不存在则创建),写入数据,然后返回“OK”。中途遇到任何错误则返回“FAIL”
假设你现在有一个20TB的文件,你按照1 GB、1 GB的方式往上传。每发送1 GB,收到OK后,继续发送下1 GB。然后不幸的是中途遇到一个FAIL,你该怎么办?能断点续传吗?NO。因为服务器有可能在写入成功的情况下也返回FAIL(或者网络超时,没有任何回复)。所以你不能重发送未完成的请求。如果你选择从头传,而文件又特别大,那么你可能永远都不会成功。

Solution:采用positioned write。即在客户端发给服务器的请求里加上文件偏移量(offset)。缺点是:若你想要多个客户端同时追加写入同一个文件,那几乎是不可能的。

第三个问题:我想发送的是“123”,对方收到的会不会是“1223”?你想要支持重连、重试,那么你得容忍这种情况发生。

Solution:在应用层给每个message标记一个id,让接收者去重即可。

接下来讨论下如何关闭连接。简单来说:谁是收到最后一条消息的人,谁来主动关闭tcp 连接。另一方在recv返回0字节之后close,千万不要主动的close。

在协议设计上,分两种情况:
  1. 协议是一问一答(类似于HTTP),且发“问”(request)的总是同一方。一方只问,另一方只答
  2. 有显式的EOF消息通知对方shutdown。
如果不满足以上两点的任何一点,那么就没有任何一方能判断它收到的消息是不是最后一条,那协议设计有问题,要改!

(p.s. Windows上还有一种方法,就是用半关连接shutdown(SD_SEND)来标志结束,但是操作起来比较复杂,还不如改协议来的快,容易debug)

2016年12月21日星期三

Windows下在线程池中使用Overlapped IO要格外谨慎

windows有个限制,异步IO操作是绑定在发起IO请求的线程上的。一旦发起者线程退出,那么这个IO操作也就会被取消,而且没有callback会被调用。

MSDN中的WSASend函数的文档中有这么一段话:
"All I/O initiated by a given thread is canceled when that thread exits. For overlapped sockets, pending asynchronous operations can fail if the thread is closed before the operations complete. "

所以在调用WSASend方法时,务必要清楚自己在什么样的线程里。假如是在一个线程池中,并且这个线程池的线程数是可以动态增减的,那么就要小心了。万一你刚Send完,这个线程就exit了,那么就惨惨惨了!这种bug非常非常难以debug出来。

解决办法:
方案1.  ThreadPool的实现者通过GetThreadIOPendingFlag函数来得知是否可以安全退出
方案2.  所有的异步IO操作都先放到一个queue里,然后由特定的线程来处理这个queue

我刚发信问过了windows kernel team负责threadpool的人,他回答说windows自带的thread pool api考虑到了这一点,他在信中说:“Threadpool will not release threads while they have pending IRPs in them。This means your IO will complete even if it completes after you return the thread back to the Threadpool.” 同样的,.net 程序也是如此。

但是,我所见过的所有使用IOCP的程序,几乎全部都是自己制作的thread pool,没有使用操作系统的thread pool API。这些thread pool能实现干干净净shutdown的寥寥无几。




2016年11月15日星期二

Windows上如何在进程间传递socket句柄

问题的背景是这样的:我需要写一个程序A,来launch另一个程序B。B是一个server程序,它需要监听一个TCP端口。这个端口是A来分配的,A需要确保这个端口没有被占用。

我最初的办法是这样:写一个函数,遍历现在的tcp table,来判断哪些端口已经在使用中。

std::vector<int> getFreePorts(int begin, int end)
{
    char errbuf[256];
    if (end <= begin) {
        THROW_EXCEPTION("illegal argument");
    }
    std::unique_ptr<MIB_TCPTABLE2, decltype(std::free)*> pTcpTable(nullptr, std::free);
    ULONG ulSize = sizeof(MIB_TCPTABLE);
    for (int iter = 0; iter != 2; ++iter) {
        pTcpTable.reset((MIB_TCPTABLE2*)malloc(ulSize));
        ULONG dwRetVal = GetTcpTable2(pTcpTable.get(), &ulSize, FALSE);
        if (dwRetVal == NO_ERROR) break;
        if (dwRetVal != ERROR_INSUFFICIENT_BUFFER) {
            THROW_EXCEPTION("GetTcpTable2 failed,error = %ul", dwRetVal);
        }
    }
    std::vector<uint8_t> avail(end - begin, 1);
    for (int i = 0; i != pTcpTable->dwNumEntries; ++i) {
        auto& table = pTcpTable->table[i];
        int off = table.dwLocalPort - begin;
        if (off >= 0 && off < avail.size()) {
            avail[off] = 0;
        }
    }
    std::vector<int> ret;
    for (int i = 0; i != (int)avail.size(); ++i) {
        if (avail[i]) ret.push_back(i + begin);
    }
    return ret;
}
但是很奇怪,子进程bind有时候还是会fail。于是我就开始我的第二个版本:父进程创建socket并bind,子进程去listen并且accept。但是这时候就冒出来一个问题:windows的handle是指针,而不是int。指针怎么跨进程传递……出乎意料的是,它直接强转成size_t来传递。

于是父进程这样创建socket:
 SOCKET ListenSocket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
 if (ListenSocket == INVALID_SOCKET) {
  THROW_EXCEPTION("socket function failed with error: %u\n", WSAGetLastError());   
 }
 // The socket address to be passed to bind
 sockaddr_in service;
 service.sin_family = AF_INET;
 service.sin_addr.s_addr = inet_addr("0.0.0.0");
 service.sin_port = htons(port);
 int iResult = bind(ListenSocket, (SOCKADDR *)&service, sizeof(service));
 if (iResult == SOCKET_ERROR) {
  int err = WSAGetLastError();
  closesocket(ListenSocket);
  THROW_EXCEPTION("bind failed with error %u\n", err);   
 }
 return ListenSocket;
把上面的代码放入一个for循环中,然后把得到的socket通过命令行参数传递给子进程。在调用CreateProcess函数时,bInheritHandles参数应该是TRUE。如果是用java的process类创建子进程,那么不用做什么特别设置,bInheritHandles一直是TRUE。在子进程启动之后,父进程应当关闭这个socket。

子进程这样使用:
#include <WinSock2.h>
#include <Windows.h>
#include <stdio.h>
int main(int argc,char* argv[]) {
 WSADATA wsaData;
 WSAStartup(MAKEWORD(2, 2), &wsaData);
 FILE* fd=fopen("server.log","w");
 SOCKET ListenSocket = _strtoi64(argv[1],nullptr,10);
 //----------------------
 // Listen for incoming connection requests 
 // on the created socket
 if (listen(ListenSocket, SOMAXCONN) == SOCKET_ERROR) {
  int err = WSAGetLastError();
  closesocket(ListenSocket);
  fprintf(fd,"listen function failed with error: %d\n", err);
  return -1;
 }
 while (true) {
  SOCKET AcceptSocket = accept(ListenSocket, NULL, NULL);
  if (AcceptSocket == INVALID_SOCKET) {
   fprintf(fd, "accept failed with error: %ld\n", WSAGetLastError());
   fflush(fd);
   break;
  }
  else {
   fprintf(fd, "Client connected.\n");
   fflush(fd);
  }
 }
 fprintf(fd, "shutdown");
 fflush(fd);
 closesocket(ListenSocket); 
 fclose(fd);
 WSACleanup();
 return 0;
}

比较有趣的是,如果通过netstat这样的命令查看,会显示是父进程在监听端口并接受连接。这是一个绕过防火墙的很好的办法。Windows用户喜欢把一些程序列为可信任的,这些程序(比如IE、QQ)不受防火墙规则的限制。那么你只需要往这样的程序注入一小段代码,就可以让任意的程序悄悄绕过防火墙。这比直接修改防火墙规则要隐蔽多了。

上述方法适用于父进程和子进程存在父子关系的情况。如果不是父子关系,那么就得使用WSADuplicateSocket函数。source进程要先获知target进程的pid,然后通过WSADuplicateSocket函数得到一个WSAPROTOCOL_INFO结构体,然后自己想办法把WSAPROTOCOL_INFO这个结构体传递给target进程。

2016年11月14日星期一

进程内的生产者/消费者队列,一点思考和总结

考虑这样一个实际场景:
有一个文本文件,需要根据每一行的内容计算出一个score来。将score输出到一个新的文件。假设计算score是很耗费cpu的,请尽可能用多线程并行化的方式来加速。

这时我们会想到map-reduce/work-stealing/bounded queue等等。

拿bounded queue来说,最基本的版本是一个mutex两个条件变量,高级的版本是两个mutex两个条件变量。

接下来要解决的问题是:当生产者读到文件末尾时,如何把这件事情告诉消费者? 假设只有一个生产者,有多个消费者。一种简单的做法是:如果消费者的数量固定,已知,假设有n个,那么就往队列里扔n个特殊类型的消息以标记EOF。当消费者读到EOF则退出。这个思路也可扩展到多个生产者、多个消费者的情况。这是一种纯message-passing的方案。

但是如何处理突发出错?假设生产者读文件的时候突然读到一条错误的记录,它想要立刻通知所有的消费者结束,该怎么办?如果我们的队列是支持优先级的,那么把control message设置成最高优先级,这个问题也可以解决。但如果是反过来,消费者报错,希望所有的生产者和消费者都立刻终止,该怎么办?如果我们有一个双向的queue,这个问题也可以解决。但是往往最简单的方案还是加入一个coordinator,让它具有一个isHealthy()这样的方法。此时,所有条件变量的wait方法都需要改成timed wait。

再看另一个与此很相关的问题:假设我们有n个task,要交给n个线程同时执行。然后主线程需要等待并搜集这n个结果。这个看似很简单,可以用count down latch实现一个栅栏,也可以创建n个feature 然后挨个wait。但是假如要考虑出错的情况,即,子线程在执行的时候可能会遇错提前退出,此时希望其它的线程也尽快停止。

这时候最简单的办法是:让count down latch有一个countDownToZero这样的方法。这样主线程就收到错误直接停止等待了。还有一种办法就是把task result及task id都放入一个unbounded queue中,然后主线程一直wait在这个queue上。

但无论如何,为了能安全且及时的通知其它线程退出,无论什么样的场景,代码中不能有无限时间的wait。所有的wait必须得带有一个时间间隔。相比而下java就简单些,因为java可以interrupt线程。C/C++中虽然有pthread_cancel这样的方法,但是终归还是太危险。

事情还没完,假如IO是异步的,这问题又复杂许多。举个例子,假设读文件是先发起一个http请求,然后等待callback处理IO完成或异常。那么怎么处理出错呢?我们可以在每个callback的开始加上isHealthy(),此外,还必须知道有没有callback正在执行。如果有,主线程就得继续等。如果没有,必须保证不会有新的callback被触发,然后才能退出。

2016年10月27日星期四

如何在VC++中强制引用静态库里的所有全局变量

这个坑啊…… 真是历史悠久名声昭著了。

假如你的exe用到了一个静态库,而这个静态库有一些全局变量。那么这些全局变量未必会被引用到这个exe当中。

举个例子:
Foo.h:
===================
class Foo
{
public:
Foo();
};
===================

Foo.cpp:
===================
#include "Foo.h"
#include <stdio.h>


Foo::Foo()
{
printf("hello");
}

static Foo foo;
===================
假如Foo.cpp是被编译成静态库,那么链接的时候可能没有foo这个变量,从而也不会有"hello"输出。

有些人写代码喜欢用全局变量来实现singleton,并且期望这些singleton的构造函数会在main函数之前被调用。呵呵…… 到了vc这里就不灵了。

可是,如果万一他的代码就是这样,你又没法改,办法还是有的。

1. 要以项目引用的方式来引用静态库,而不是把.lib文件作为input给linker
2. 把“link library dependencies” 和 "Use link library dependency inputs” 这两个选项设置成true。

亲测有效。VC++ 2015。


2016年10月22日星期六

如何正确的关闭已打开的文件

如果一个文件是以只读的方式打开的,那么忘记关闭的后果是:句柄泄露。通常来说这不是太大的问题。
如果一个文件是以可写的方式打开的,那么忘记关闭的后果是:除了句柄泄露以外,写进去的东西可能会丢失。并且,如果关闭了,但是忘记检查close(或fclose)函数的返回值,也会有丢数据的风险。

要注意点什么?

写文件的时候,要手动关,不要依赖于析构函数。用RAII来管理IO资源是非常愚蠢的。如果fclose出现在析构函数里,那么通常就是一个错误。因为析构函数不能抛异常,如果fclose返回非0的值,你很难把这个错误报告出去。出错不可怕,最可怕的是出错了但是你却不知道。比如,你的程序每天自动生成一个config文件然后自动推送到线上每台机器。结果有一天,生出来的config文件缺了一块,但是你却不知道,而这个config却被推送出去了。

什么时候fclose会返回非0值?

比如,硬盘满了。再比如,有些机器的硬盘会有一些临时故障,会导致写入偶尔失败。而这种临时故障不一定会产生报警。其实我们希望一旦有这样的问题发生,这些机器能赶紧被应用程序发现,然后从系统中剔除出去。

正确的该怎么写?

FILE* fd = fopen("xxx.txt","w");
if(!fd) return;
try{
  //do normal io stuffs
   ... 
}catch(std::exception& ex){
   if(fclose(fd)){
      //....?
   }
}
if(fclose(fd)) throw std::runtime_error("err");

问题来了,如果我们想把fclose失败这件事情报告出去,就得在处理异常的时候又抛出新的异常。该怎么办?遗憾的是,C++中对于这样的问题,并没有标准的做法。

Java是怎么处理这样的问题?

Java7为此特地推出了Suppressed Exceptions.
比如下面的代码:
package testjava;

import java.io.Closeable;
import java.io.IOException;

public class T1 {

 static class A implements Closeable{

  @Override
  public void close() throws IOException {
   throw new RuntimeException("Err");   
  }
  
 }
 
 static void foo() throws Exception{
  try(A a=new A()){
   throw new RuntimeException("er");
  }
 }
 
 public static void main(String[] args) throws Exception {
  try{
   foo();
  }catch(Exception ex){
   ex.printStackTrace();
  }
 }

}
会产生下面这样的输出:

java.lang.RuntimeException: er
at testjava.T1.foo(T1.java:19)
at testjava.T1.main(T1.java:25)
Suppressed: java.lang.RuntimeException: Err
at testjava.T1$A.close(T1.java:12)
at testjava.T1.foo(T1.java:20)
... 1 more

虽然代码看起来更简单了,但是这其实是把问题隐藏的更深了。谁在处理异常的时候会检查它内部有没有另一个异常呢?所以你其实没有机会把硬盘错误报告出去。

最终最重要的一句话:
“Don't trust your callers, nor your callees, always safeguard your code”