我的mapper终于跑起来了。

我最近在做一套数据统计平台。这周,写了一个工具,从mysql里面读数据,转换成protocol buffers的格式,然后写入到leveldb中。测试了一下性能,大概是每秒2万条左右(单线程),每条记录长度100个字节。weibo上@郝立华 说“mysql自己dump成sql的速度比你这个快多了”,但是我比较怀疑。因为mysql一秒钟想插入几万条记录,还是很困难的。However,我的最主要的目的是转格式,因为sawzall的input只认两种格式,protocol buffers和plain text。

如我之前的blog所说,这个数据流,主要分三个处理阶段:map、sort、reduce。google的文档中常说第二个阶段是shuffer/sort。我不是很明白此处shuffer的意义。我今天把map阶段整了一个demo出来,从leveldb里面读数据,然后做统计。我这个demo所做的统计相当于从httpd的access log统计每个页面的访问次数,实际运行效率,大概是每秒12万条左右。我没有想到的是,在硬盘带宽未跑满之前,先把CPU跑满了。整个进程CPU占有率始终在150%左右(我主逻辑是一个单线程的循环,后台还有一个线程是leveldb的后台线程)。

然后我就一直在思考一个问题,把数据放哪。HDFS,还是支持水平切分的key-value database?下面分别说下我的想法。

我最想要的是一个列存储的数据库,可惜没有合适的,而且没想好怎么把它和sawzall搭配起来,放弃。

然后我就想,要一个Riak这样的东西。它其实很简单,就是把key,散列到多台机器上去,分散存储。然后我在每个机器上遍历各自的数据库(map阶段),然后发送到中央机器做汇总(reduce阶段)。这个想法是多么的朴素简洁啊。

如果我不想用riak呢,如果不想因为rpc带来额外的开销,那么就在硬盘上放很多个目录,每个目录对应一个leveldb的db。然后自己打开目录读去,多简单啊。我当时还在想,要不要把不同人的任务合并在一起执行。比如,我现在正在扫描周一到周日的数据,正扫到周三了。恰好另一个任务进来,也要这么扫。那么你不妨跟我先一起从周三跑到周日,然后我就结束了。你再从周日跑到周一。

但是google不是这么做的。google的map reduce框架中,reducer从来不只一个 ,一般是几千个,略大于总机器数。而mapper更多了。它是把数据存在GFS里面。GFS的每个block是64MB,然后每个block对应一个Map任务,那么1T数据就对应着1万多个mapper,此时,显然需要很多个reducer啊!

于是我就有另外一个想法:我用zfs create一个新的zfs,这个zfs就只有一个目录,里面有好多个文件,每个文件大小都是64MB,或者略小。文件名就是Blockid。那么我找一个master机器,记录record key range \<–> blockid 的映射。那么在创建任务的时候,根据keyrange把任务分发的对应的机器上去即可。fread/fwrite的效率要远高于leveldb。反正我写数据的时候都是先序列化再写,那么序列化之后我就知道datasize了。先写2字节的datasize,再写序列化后的data。无论是读还是写,代码都很容易实现。

然后最重要的是,数据和任务可以不在一台机器上。当然,我是尽力想让它们在一台机器上,但是如果遇上今天这样,硬盘还没跑满,CPU先满了,那么我就把这个文件直接sendfile出去,发到另一台机器上执行。如果是千M网,传输这么大的数据块,overhead应该很小。于是任务调度就是一件很有趣很值得思考的事情。

我年前专门查了,hdfs有C API,虽然未必好使。

此博客中的热门博文

在windows下使用llvm+clang

少写代码,多读别人写的代码

tensorflow distributed runtime初窥