2016年9月26日星期一

How to removes duplicate values from an array: A counterintuitive story of Big O

Everyone knows O(N) is faster than O(NlogN). However, sometimes, it isn't.
Please look at this example:

template <typename T>
size_t count_unique_elements_by_map(T* begin, T* end) {
  std::unordered_set<T> s(end - begin);
  s.insert(begin, end);
  return s.size();
}

template <typename T>
size_t count_unique_elements_by_sort(T* begin, T* end) {
  if (begin >= end) return 0;
  std::sort(begin, end);
  auto newEnd = std::unique(begin,end);
  return newEnd - begin;
}

Method 2 is faster than method 1 if duplicates are rare.

问题的背景是这样:我有一些ID,我想把这些ID对应的value查出来。这些value保存在一个分布式系统中(如memcached)。为了减少服务器的查询压力,我把这些ID先去重,然后计算每个ID在哪个服务器上,把ID按服务器分组,然后发送查询。最初我用了一个hash map做这件事情,然后我发现去重的操作所耗费的CPU远远超过其它代码(比如网络IO)。

然后我发现sort比hash更快,在我的场景下,快10倍以上。但是故事并未到此结束,当我收到服务器的reply之后,我还是得把这些ID以及value插入到一个hash map中。否则我就只能用binary search读取它们。所以,我还是得构建这个hash map,逃不掉的。

再后来我发现,如果把STL的hash map换成我自己写的一个基于open address方式的hash map,那么速度也能提升1倍左右。

Please tell me if you have any better solutions. Thanks.

Torch的tensor

Torch中的tensor是一个n维数组,它是torch最基础最重要的组件。
下面这段代码演示如何通过下标来访问tensor。这段代码首先,初始化一个长度为10的1维数组,将其内容填充成0,1,2...9,然后遍历这个tensor,将其内容打印出来。
#include "TH/TH.h"

#define Real Double
#define real double

int main() {
 THTensor* tensor1 = THTensor_(newWithSize1d)(10);
 for (int64_t i = 0; i != 10; ++i) {
  THTensor_(set1d)(tensor1, i, (real)i);  
 }
 for (int i = 0; i != 10; ++i) {
  printf("%g ", THTensor_(get1d)(tensor1, i));
 }
 printf("\n");
 THTensor_(free)(tensor1);
 return 0;
}
输出应该是:
0 1 2 3 4 5 6 7 8 9
使用下标来访问tensor有两种方式,上面这种是通过set/get函数,比较慢。更快的方式是这样。
#include "TH/TH.h"

#define Real Double
#define real double

int main() {
  THTensor* tensor1 = THTensor_(newWithSize1d)(10);
  int i = 0;
  TH_TENSOR_APPLY(real, tensor1, *tensor1_data = (real)i; ++i;);
  for (int i = 0; i != 10; ++i) {
    printf("%g ", THTensor_(get1d)(tensor1, i));
  }
  printf("\n");
  THTensor_(free)(tensor1);
  return 0;
}
TH_TENSOR_APPLY是一个宏,在TH/THTensorApply.h中声明,它的code很复杂,但是又很需要读一读。本文之后的内容主要介绍这个宏的原理。

首先看下tensor的内部构造。
struct THTensor
{
    long *size;
    long *stride;
    int nDimension;
    
    THStorage *storage;
    long storageOffset;
    int refcount;
    char flag;
}
tensor的实际内容存在THStorage中。拿数据库来做类比,如果把THStorage看成是物理表,那么tensor就是视图。下面是从tensor获取storage指针,然后从storage通过THStorage_(data)函数获得裸指针,然后遍历这个tensor。如下面这段代码所示。
#include "TH/TH.h"

#define Real Double
#define real double

int main() {
 THTensor* tensor1 = THTensor_(newWithSize1d)(10);
 THStorage* s = THTensor_(storage)(tensor1);
 double* data = THStorage_(data)(s);
 int64_t size = THStorage_(size)(s);
 for (int64_t i = 0; i != size; ++i) {
  data[i] = (double)i;
 }
 THTensor_(free)(tensor1);
 return 0;
}
所以当需要实现向量的加减乘除、点乘等操作时,出于效率考虑,肯定会采用后一种方法,而不是采用set1d/get1d。于是问题来了。给定下标,如何计算出offset ?
以一个4维的tensor为例子
tensor[d][c][b][a] 的offset等于  a*stride[0] + b*stride[1] + c *stride[2] + d*stride[3]]
如下面的代码所示:
#include "TH/TH.h"
#include <assert.h>

#define Real Double
#define real double

int main() {
 THTensor* ts = THTensor_(newWithSize4d)(8, 4, 6, 7);
 long stride1 = THTensor_(stride)(ts, 0);
 long stride2 = THTensor_(stride)(ts, 1);
 long stride3 = THTensor_(stride)(ts, 2);
 long stride4 = THTensor_(stride)(ts, 3);
 printf("strides: %ld %ld %ld %ld\n", stride1, stride2, stride3, stride4);
 double* data = THTensor_(data)(ts);
 int a = 2;
 int b = 3;
 int c = 5;
 int d = 2;
 THTensor_(set4d)(ts, a, b, c, d, 12);
 assert(data[a*stride1 + b*stride2 + c *stride3 + d*stride4] == 12);
 THTensor_(free)(ts);
 return 0;
}
输出:
strides: 168 42 7 1
THTensor_(data) 函数返回的并不是struct THTensor中storage的data指针,而是data+storageOffset。

real *THTensor_(data)(const THTensor *self)
{
  if(self->storage)
    return (self->storage->data+self->storageOffset);
  else
    return NULL;
}

大多数时候我们无需关注这个细节。

对于刚new出来的tensor,torch可以保证tensor的最后一维总是连续的。所以strides数组的最后一个元素一定是1。它的strides满足:
stride[d] = isLastDimension(d) ? 1 : size[d+1] * stride[d+1];
但情况并非总是如此。比如一个2维的tensor经过转置之后,它依然和之前的tensor共享同一个storage。比如下面这段代码:
#include "TH/TH.h"
#include <assert.h>
#include <stdexcept>

#define Real Double
#define real double

static void printMatrix(THTensor* tensor) {
 int nDim = THTensor_(nDimension)(tensor);
 if (nDim != 2)
  throw std::runtime_error("dimension error");
 long size0 = THTensor_(size)(tensor, 0);
 long size1 = THTensor_(size)(tensor, 1);
 for (int i = 0; i != size0; ++i) {
  for (int j = 0; j != size1; ++j) {
   printf("%g ", THTensor_(get2d)(tensor, i, j));
  }
  printf("\n");
 }
}

int main() {
 THTensor* tensor1 = THTensor_(newWithSize2d)(3,4);
 THStorage* s = THTensor_(storage)(tensor1);
 long stride1 = THTensor_(stride)(tensor1, 0);
 long stride2 = THTensor_(stride)(tensor1, 1);
 printf("Before transpose:\n");
 printf("strides: %ld %ld\n", stride1, stride2);
 double* data = THStorage_(data)(s);
 int64_t size = THStorage_(size)(s);
 for (int64_t i = 0; i != size; ++i) {
  data[i] = (double)i;
 }
 printf("content:\n");
 printMatrix(tensor1); 
 printf("After transpose:\n");
 THTensor* tensor2 = THTensor_(newTranspose)(tensor1, 0, 1); 
 stride1 = THTensor_(stride)(tensor2, 0);
 stride2 = THTensor_(stride)(tensor2, 1); 
 printf("strides: %ld %ld\n", stride1, stride2);
 printf("content:\n");
 printMatrix(tensor1);
 THTensor_(free)(tensor2);
 THTensor_(free)(tensor1);
 return 0;
}
输出:
Before transpose:
strides: 4 1
content:
0 1 2 3
4 5 6 7
8 9 10 11
After transpose:
strides: 1 4
content:
0 1 2 3
4 5 6 7
8 9 10 11


这时候,如果想要遍历这个转置后的矩阵,就复杂的多了。对于一个n维数组,如果我们想遍历它,可以分配一个counter
size_t counter[dim]; //当前元素的下标
for(int i=0;i!=dim;++i) counter[0] = 0;

然后每遍历一个元素后,用下面的代码递增counter
bool isFinished = false;
for(int i=dim;i>=0;i--){
   counter[i]++;
   if(counter[i] != size[i]) break;
   if(i==0) {
     isFinished = true;
     break;
  }
   counter[i] = 0;
}
当isFinished=true的时候,终止所有循环。
这又变成了刚才的问题,如果我们对每个下标都用一系列的乘法和加法来计算offset,就会变得很低效。于是我们可以这样,用real* tensor1_data指向当前元素,然后在每次修改counter数组的时候,顺便也修正tensor1_data。
    for(i = tensor1_dim; i >= 0; i--) 
    { 
      tensor1_counter[i]++; 
      tensor1_data += tensor1->stride[i]; 

      if(tensor1_counter[i]  == tensor1->size[i]) 
      { 
        if(i == 0) 
        { 
          hasFinished = 1; 
          break; 
        } 
        else 
        { 
          tensor1_data -= tensor1_counter[i]*tensor1->stride[i]; 
          tensor1_counter[i] = 0; 
        } 
      } 
      else 
        break; 
    }
在此基础上可以再有另外一个优化,计算tensor的最低维部分有多少维是连续的,即满足 stride[d] = isLastDimension(d) ? 1 : size[d+1] * stride[d+1];
对于这些维度,我们可以用一个for循环一次遍历完。
完整的代码如下:(How to loop an n-dim array)
  real *tensor1_data = NULL; 
  long *tensor1_counter = NULL; 
  long tensor1_stride = 0, tensor1_size = 0, tensor1_dim = 0, i; 
  int hasFinished = 0; 

  if(tensor1->nDimension == 0) 
    hasFinished = 1; 
  else 
  { 
    tensor1_data = tensor1->storage->data+tensor1->storageOffset; 

    /* what is the first stride (ignore first dims=1)? */ 
    /* it will be used for the whole largest contiguous section */ 
    for(tensor1_dim = tensor1->nDimension-1; tensor1_dim >= 0; tensor1_dim--) 
    { 
      if(tensor1->size[tensor1_dim] != 1) 
        break; 
    } 
    tensor1_stride = (tensor1_dim == -1 ? 0 : tensor1->stride[tensor1_dim]); 

    /* what is the largest contiguous section? */ 
    tensor1_size = 1; 
    for(tensor1_dim = tensor1->nDimension-1; tensor1_dim >= 0; tensor1_dim--) 
    { 
      if(tensor1->size[tensor1_dim] != 1) 
      { 
        if(tensor1->stride[tensor1_dim] == tensor1_size) 
          tensor1_size *= tensor1->size[tensor1_dim]; 
        else 
          break; 
      } 
    } 

    /* counter over found dimensions */ 
    tensor1_counter = (long*)THAlloc(sizeof(long)*(tensor1_dim+1)); 
    for(i = 0; i <= tensor1_dim; i++) 
      tensor1_counter[i] = 0; 
  } 

  while(!hasFinished) 
  { 
    for(i = 0; i < tensor1_size; i++, tensor1_data += tensor1_stride) 
    { 
      //PUT USER CODE HERE 
    } 

    if(tensor1_dim == -1) 
       break; 
 
    tensor1_data -= i*tensor1_stride; 
    for(i = tensor1_dim; i >= 0; i--) 
    { 
      tensor1_counter[i]++; 
      tensor1_data += tensor1->stride[i]; 

      if(tensor1_counter[i]  == tensor1->size[i]) 
      { 
        if(i == 0) 
        { 
          hasFinished = 1; 
          break; 
        } 
        else 
        { 
          tensor1_data -= tensor1_counter[i]*tensor1->stride[i]; 
          tensor1_counter[i] = 0; 
        } 
      } 
      else 
        break; 
    } 
  } 
  THFree(tensor1_counter); 



2016年9月11日星期日

少写代码,多读别人写的代码

我想给我的电脑前贴个条子:“少写代码,多读别人写的代码”
想要成为一个优秀的程序员真的很不容易。
不是说天天读英语听英语,英语水平就会好。只有在刻意学习的时候,才会有收获。
同样的,不是说天天专心写代码,coding能力就会渐涨。除非刻意的想要提高自己,否则,不出几年就会达到自身的瓶颈。
我才多大啊,比我老练的程序员多了去了,如大海里的沙子。
要天天牢记自己是个渣滓,督促自己少写代码,只写必要的。因为写了就得有人维护,就算自己不嫌累,也不要给别人添麻烦添负担。


2015年8月12日星期三

当心Intel® Management Engine埋下openssl的雷

Intel的电脑经常会默认安装一个叫做Intel® Management Engine的软件。这个东西会导致严重的安全漏洞,甚至让很多程序无法运行。

举个例子,我今天自己在windows下编译了一个curl然后运行,发现出错了。报告

curl.exe - Ordinal Not Found
The ordinal 385 could not be located in the dynamic link library C:\os\curl-7.43.0\builds\libcurl-vc14-x64-release-static-ssl-dll-zlib-dll-sspi\bin\curl.exe.

后来我用depends.exe发现,它引用了c:\program files (x86)\intel\icls client\LIBEAY32.DLL这个文件。

而icls client是Intel® Management Engine的一部分,它安装后会把自己加到PATH环境变量中,它还带了两个openssl的dll。这种软件几百年都不会更新一次,想想去年openssl的heart bleed bug。 所以,赶紧,我建议,卸载这个破软件了事。

2015年8月11日星期二

Reduce, Broadcast and AllReduce

在做机器学习的时候,经常要做这样一个操作:每台机器计算出一个vector,然后把这些vector汇总加起来,再广播回去

传统模式

如下面这组图所示:

1.png

2.png

3.png

4.png

总体来说,它可以分为两步:

  1. Reduce。 即map-reduce中的那个reduce。
  2. Broadcast。 把reduce的结果broadcast到所有节点。

直接以这样两步走的方式解决此问题会很低效。如果vector长度很大,那么负责汇聚的那个节点,就会成为性能瓶颈。

树状模式

于是就有了基于tree的map-reduce来解决这个问题。这种方式被广泛用在了Vowpal Wabbit和spark中。

tree1.png

tree2.png

tree3.png

broadcast的过程与reduce的过程类似,我就不画了。

下面举个例子分析下它需要花多少时间。 假设我们有75台机器,采用5叉树的方式来汇聚。那么需要5层。 第一层75台机器。 第二层15台机器。 第三层3台机器。 第四层1台机器。

假设每个Vector的大小是1GB,而带宽是10Gb/s,那么reduce阶段至少需要(5+5+3)*8/10=10秒才能传输完。

采用上一种方案,reduce需要75*1*8/10=60秒。相比之下有很大的改进。

All to All 模式

All to All的模式是每台机器和每台机器都得建立一个连接。

初始状态是这样:

  value1 value2 value3 value4
机器1 1 3 2 1
机器2 2 5 4 5
机器3 8 10 9 7
机器4 6 2 3 6

然后,第i行的数据都汇总到第i%n台机器上。(n为机器数)
如下表所示。红色的代表新修改的值。

  value1 value2 value3 value4
机器1 17 3 2 1
机器2 2 20 4 5
机器3 8 10 18 7
机器4 6 2 3 19

然后broadcast出去

  value1 value2 value3 value4
机器1 17 20 18 19
机器2 17 20 18 19
机器3 17 20 18 19
机器4 17 20 18 19

然后还是按上面的例子算下时间。可以发现,网络开销和机器数量无关。每台机器都需要传输大约2GB*(1-1/n)的数据出去,接收1GB * (1-1/n)的数据进来。所以,大约2秒就可以完成。

汇总比较一下:

  所需时间
传统方法 60秒*2
Tree 10秒*2
All to All 2秒

结合实际

Yahoo的vowpal wabbit是率先把tree allreduce与hadoop结合起来。

spark在reduce阶段提供了tree的模式,实现方法是通过多轮的map-reduce。在broadcast阶段,它默认采用bittorrent的方式做broadcast,这也算是一大创举吧

2015年7月11日星期六

L1-regularized logistic regression

先挖个坑吧,以后慢慢填。

刚读到台大几个学生写的一篇关于L1-regularized logistic regression(L1LR)综述性的论文:“A Comparison of Optimization Methods and Software for Large-scale L1-regularized Linear Classification”。使我很受益,他们把各种思路和方法都罗列了出来,使得我可以按图索骥,一一尝试。不至于自己独自瞎忙活。所以我先挖个坑,逐渐的慢慢更新。

关于问题的背景请参考我的Logistic Regression笔记。我认为L1-regularized logistic regression的核心是如何对一个非平滑(non-smooth)的非线性凸函数求最小值的问题。这在数学上是一个很广的topic,够写厚厚一本书了。我在此试图以L1LR为例探一探路。

关于subgradient与BFGS的关系,在这篇A Quasi-Newton Approach to Nonsmooth Convex Optimization Problems in Machine Learning 论文中讲的比较好。

Boyd的插值法

http://stanford.edu/~boyd/l1_logreg/

论文: An Interior-Point Method for Large-Scale `1-Regularized Logistic Regression

Glmnet

论文: Regularization paths for generalized linear models via coordinate descent . Jerome H. Friedman, Trevor Hastie, and Robert Tibshirani. Journal of Statistical Software, 33(1):1–22, 2010.

L-BFGS-B

这是一个比较经典的算法了,也可以用在这个问题上。

OWLQN

论文: OrthantWise Limited-memory Quasi-Newton "Scalable Training of L1-Regularized Log-Linear Models(2007)"

视频:

Scalable Training of L1-regularized Log-linear Models

OWL-QN主要的改动有这么几点:

1、 梯度在传递给BFGS用之前,要转换成pseudo gradient。

2、 在计算方向的时候,算出的\(H_k∇f_k\)要用\(∇f_k\)修正,sign不同就要取0。
下面的代码中dir是\(-H_k∇f_k\), g是\(∇f_k\)。\(∇f_k\)是pseudo gradient

    private static void constrainSearchDir(double[] dir, double[] g) {
          for (int i = 0; i != g.length; ++i) {
                if (dir[i] * g[i] >= 0.0) {
                       dir[i] = 0.0;
                }
          }
    }

3、 计算出的newX也需要做投影。根据旧的x求newX的规则是

newX[i]=x[i] + step * dir[i];  
newX[i]=newX[i]*((x[i] == 0.0) ? -grad[i] : x[i])>0?newX[i]:0;  

4、 LBFGS中用来计算逆Hessian矩阵的y(i)(即\(\Delta g\)),用的是未修正的梯度。不是那个pseudo gradient,而是OWLQN修正之前的那个梯度!

关于OWL-QN的一些思考

OWL-QN所选择的pseudo gradient其实是f(x)的所有subgradient中最靠近0的一个(范数最小的一个)。

计算逆Hessian矩阵的y(i)之所以用未修正的梯度是因为|x|的二阶导数等于0(当x不等于0)。

另外,论文中未提及,但是实践中发现,OWLQN会break wolfe condition。所以line search的算法只能用backtracking,又因为初始步长是1,所以每次算出来的step必然是<=1的。

OWLQN的 实现

下面罗列一下我所看见的OWLQN的实现(共计4个),并做一下比较

C/C++:

  1. 论文的原始作者提供了demo代码,发布在research.microsoft.com上。它没有检查curvature condition,它使用αp来计算充分下降条件。

  2. libLBFGS 作者是一个日本人Naoaki Okazaki,任教于日本东北大学。 可以说是他首先把OWLQN的坑都踩了一遍。他是首个指出OWLQN不能和MoreThuente一维搜索算法一起用。他没有检查curvature condition,他使用 Δx 来判断充分下降条件.

Java:
stanfordnlp 中也有 OWLQN 的实现。 stanfordnlp顾名思义,是由Stanford NLP group开发并维护的一个机器学习库。它没有检查curvature condition,它使用 Δx 来判断充分下降条件。它每次循环会检查\(y \cdot s > 0 \) 来维护H正定。

Scala:
breeze 我感觉这个库基本上是从stanfordnlp衍生过来的。 It enforces Wolfe and strong Wolfe conditions,所以我很怀疑它实际中是否真的可用,但是实际上spark mllib就是用的它的实现。它使用 αp 来判断充分下降条件。

但是作者又在注释中写到 “Technically speaking, this is not quite right. dir should be (newX - state.x) according to the paper and the author. However, in practice, this seems fine. And interestingly the MSR reference implementation does the same thing (but they don't do wolfe condition checks.).”

参考

Optimization Methods for l1-Regularization Mark Schmidt,2009
A comparison of numerical optimizers for logistic regression Thomas P. Minka October 22, 2003
我的BFGS的笔记
我的Logistic Regression笔记

2015年6月23日星期二

java处理大数组真是让人头疼

我在spark中创建了一个长度为10亿,类型为double[]的accumlator。结果,发现函数send不出去,driver无法把这个函数send给worker

15/06/23 04:57:52 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.OutOfMemoryError: Requested array size exceeds VM limit
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
  at java.util.Arrays.copyOf(Arrays.java:2271)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
  at java.io.ObjectOutputStream¥BlockDataOutputStream.drain(ObjectOutputStream.java:1870)
  at java.io.ObjectOutputStream¥BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1779)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1186)
  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
  at org.apache.spark.util.ClosureCleaner¥.ensureSerializable(ClosureCleaner.scala:312)
  at org.apache.spark.util.ClosureCleaner¥.org¥apache¥spark¥util¥ClosureCleaner¥¥clean(ClosureCleaner.scala:305)
  at org.apache.spark.util.ClosureCleaner¥.clean(ClosureCleaner.scala:132)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
  at org.apache.spark.rdd.RDD¥¥anonfun¥mapPartitions¥1.apply(RDD.scala:683)
  at org.apache.spark.rdd.RDD¥¥anonfun¥mapPartitions¥1.apply(RDD.scala:682)
  at org.apache.spark.rdd.RDDOperationScope¥.withScope(RDDOperationScope.scala:148)
  at org.apache.spark.rdd.RDDOperationScope¥.withScope(RDDOperationScope.scala:109)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
  at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:682)
  at org.apache.spark.api.java.JavaRDDLike¥class.mapPartitionsToDouble(JavaRDDLike.scala:177)
  at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToDouble(JavaRDDLike.scala:47)
  at com.sunchangming.MyLogisticObjectiveFunction.calculateImpl1(MyLogisticObjectiveFunction.java:159)
  at com.sunchangming.MyLogisticObjectiveFunction.calculate(MyLogisticObjectiveFunction.java:42)
  at com.sunchangming.MyQNMinimizer.minimize(MyQNMinimizer.java:453)
  at com.sunchangming.JavaLR.main(JavaLR.java:219)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.spark.deploy.yarn.ApplicationMaster¥¥anon¥2.run(ApplicationMaster.scala:483)

后来我想了一个取巧的办法,把这个accumlator的初始值为null,于是send出去了,但是处理完之后driver收不回来,还是同样的错误。

15/06/23 05:54:55 ERROR executor.Executor: Exception in task 1319.0 in stage 1.0 (TID 1458)
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
   at java.util.Arrays.copyOf(Arrays.java:2271)
   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
   at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
   at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1870)
   at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1779)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1186)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
   at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:252)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:724)

对应的代码如下:
Executor:run():
  val accumUpdates = Accumulators.values
  val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
  val serializedDirectResult = ser.serialize(directResult)

无奈了。