用Sawzall在map-reduce框架下做数据统计

(这是一篇草稿,记下来给自己备忘。如果有人感兴趣,我整理下写详细点)。

Sawzall是7-8年前,google所用的一套的数据统计系统。在今天看它虽然已经过时了,但是瘦死的骆驼也比马大,它的很多设计点依然非常值得我们学习参考。

下面举一个例子:

有一个视频网站,它有很多注册用户。每个用户每看一个视频,它就会记录下这个用户看了什么视频。日志文件的每一行都是这样的形式:

33433,5454545

前面一个数字是userid(哪个用户),后面一个数字是program_id(看了哪个视频)。

现在要求根据这些日志文件,产生一个新的数据文件,使得我能根据userid能快速的查到这个用户看过哪些视频。只记录视频ID,不用这个视频管看了几次。

例如:

假设输入为:

10,555
20,666
30,777
10,344
30,554

那么输出的结果应该是:

10, (555,344)
20, (666)
30, (777,554)

如果这些日志文件不大,比如,总共加起来也就1-2G,那么太简单了,把所有文件传到同一台机器上,然后随便写段代码,在内存中放一个std::map<int,std::set<int>>这样的结构,把日志文件逐行处理一遍就行了。

可是如果这些日志文件非常大,分散在几千台机器上,总共加起来有几十T。以至于即便产生的最终结果,也有好几T。下面展示一下,Google如何用sawzall解决这个问题。

一共分三个阶段:Map、Reduce、Final。

一、Map:

这个阶段就是在存储数据的机器上(或者离它较近的机器),对原始日志文件做逐行扫描,提取出有用的数据。发送给Reducer。

为了实现这个功能,sawzall代码如下:

"t: table set(100)[int] of int;"
"fields: array of bytes = splitcsvline(input);"
"index: int = int(string(fields[0]),10);"
"value: int = int(string(fields[1]),10);"
"emit t[index] <- value;";

每行的输入在input变量中,是byte[]类型。sawzall语言中,唯一的输出方式就是emit语句。emit语句,是把一个数据,emit到一个table中。当我们执行一个Sawzall process的时候,需要为每个table指定一个emiter(参见:sawzall::RegisterEmitter(const char* name, Emitter* emitter)),或者干脆为process指定一个emitter factory(参见:sawzall::Process::set_emitter_factory)。

文档中有这么一句话:“The emitter is responsible for receiving output and assigning it to the correct entries for aggregation as well as for outputting stored data in a standard format。”

我们交给emitter的是一个个变量,它要把这些信息组装起来,做成一个table entry。emitter本身是一个table entries cache。每个table entry都有一个Memory()方法用于估计内存占用量。于是根据这个,我们就能大致知道emitter的内存占用量。当它大到一定程度的时候,就调用SzlEmitter::Flusher()方法,把这些数据刷到Reducer上去。所以对于一个运行在map-reduce环境中的swazall,很重要的工作就是 自己一个自己的emitter实现,重写emitter的WriteValue(const string& key, const string& value)方法,根据key,hash到不同的Reducer。例如,假设有n台机器,那么按key.hashCode()%n来计算发送到哪个机器。 必须保证相同的key在同一个reducer上。这里的key和value已经是被encode过了,看起来是string,其实是byte[]。所以emitter的输出也是被encode过的。无论你的原始输入是什么,也无论你的程序本身是什么,Mapper阶段的输出是一连串的 std::pair<string, string>。上面的例子中,每一个std::pair<string, string>,实际上是一个std::pair<int, std::set<int> >。如果你想做的通用一点,那么pair.first中,不仅要包含key,还要包含table name。

二、Reduce

Reduce的第一步: Sort

如前面所说,此时的输入是一连串的std::pair<std::string, std::string>。

在进行真正的Reduce之前,一般会先对输入做一次 基于本地硬盘的外排序。排序后的结果应该是这样:

key1,value1

key1,value2

key2,value1

key3,value1

然后将相同的key的value做归并。示例代码如下:

KeyMergedPair* kmp = NULL;
for (int j = 0; j < mapout.size(); j++) {
    KeyValuePair* kvp = &mapout[j];
    if (kmp == NULL || kvp->first != kmp->first) {
        reducein.push_back(KeyMergedPair());
        kmp = &reducein.back();
        kmp->first = kvp->first;
    }
    kmp->second.push_back(kvp->second);
}

所以这个排序操作,是把 std::pair<std::string, std::string> 变成 std::pair<std::string, vector< std::string> >。

Reduce的第二步: 聚合

这个阶段的输入是std::pair<std::string, vector< std::string> > , 输出是std::pair<std::string, std::string> 。

在上个阶段,我们得到的结果已经是

(key1, (value1,value2,value3)

这样的形式。现在要把value1/value2/value3归并在一起。在上面的例子中,就是把3个std::set<int>合并成一个std::set<int>。如果table type不是set,而是sum。即对某一列求和,那么此时就是把很多个整数相加变成一个整数而已。

这个阶段所做的事情貌似看起来很简单,其实是非常非常有技术含量的事情。聚合操作其实一共发生在两个阶段,一个是在执行sawzall代码的时候,另一个是在reduce的时候。聚合器的代码在szl/src/emitters目录下。别被名字迷惑,这里面的类是基本上都是SzlTabWriter的子类。SzlTabWriter其实是一个工厂,它主要是创建和修改 table entries。比如我们需要把同一个key的三个value merge到一起,那么怎么merge呢?不同类型的table有不同的做法。如果是set,就是剔除相同元素。如果是sum,就是累加。这些操作都实现在SzlTabWriter的子类里。

SzlTabWriter通过SzlTabWriter:: CreateSzlTabWriter静态方法创建。

REGISTER_SZL_TAB_WRITER(kind, name)

将name(如” SzlSum”)注册到kind(如”sum”)下。

这些算法有的很简单,有的很精巧。它们的实现代码,少则200行,多则1000行左右。是sawzall中很值得拿出来炫耀的部分。

三、Final

Reduce产生的结果,是按照key range分散在很多台机器上,而且在每台机器上都是按key排好序的。于是要得到最终结果,就是对Reduce的结果做归并排序即可。O(n) 的算法复杂度。

此时还要注意的是,直到Reduce阶段,产生的结果都是Encode过的,也就是std::pair<std::string, std::string> 这样的形式,而不是我们要的int之类的。毫无疑问,在数据最终出去给人看之前,这个阶段必须做decode。

此博客中的热门博文

少写代码,多读别人写的代码

在windows下使用llvm+clang

tensorflow distributed runtime初窥