java处理大数组真是让人头疼

我在spark中创建了一个长度为10亿,类型为double[]的accumlator。结果,发现函数send不出去,driver无法把这个函数send给worker

15/06/23 04:57:52 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.OutOfMemoryError: Requested array size exceeds VM limit
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
  at java.util.Arrays.copyOf(Arrays.java:2271)
  at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
  at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
  at java.io.ObjectOutputStream¥BlockDataOutputStream.drain(ObjectOutputStream.java:1870)
  at java.io.ObjectOutputStream¥BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1779)
  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1186)
  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
  at org.apache.spark.util.ClosureCleaner¥.ensureSerializable(ClosureCleaner.scala:312)
  at org.apache.spark.util.ClosureCleaner¥.org¥apache¥spark¥util¥ClosureCleaner¥¥clean(ClosureCleaner.scala:305)
  at org.apache.spark.util.ClosureCleaner¥.clean(ClosureCleaner.scala:132)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
  at org.apache.spark.rdd.RDD¥¥anonfun¥mapPartitions¥1.apply(RDD.scala:683)
  at org.apache.spark.rdd.RDD¥¥anonfun¥mapPartitions¥1.apply(RDD.scala:682)
  at org.apache.spark.rdd.RDDOperationScope¥.withScope(RDDOperationScope.scala:148)
  at org.apache.spark.rdd.RDDOperationScope¥.withScope(RDDOperationScope.scala:109)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
  at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:682)
  at org.apache.spark.api.java.JavaRDDLike¥class.mapPartitionsToDouble(JavaRDDLike.scala:177)
  at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToDouble(JavaRDDLike.scala:47)
  at com.sunchangming.MyLogisticObjectiveFunction.calculateImpl1(MyLogisticObjectiveFunction.java:159)
  at com.sunchangming.MyLogisticObjectiveFunction.calculate(MyLogisticObjectiveFunction.java:42)
  at com.sunchangming.MyQNMinimizer.minimize(MyQNMinimizer.java:453)
  at com.sunchangming.JavaLR.main(JavaLR.java:219)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.spark.deploy.yarn.ApplicationMaster¥¥anon¥2.run(ApplicationMaster.scala:483)

后来我想了一个取巧的办法,把这个accumlator的初始值为null,于是send出去了,但是处理完之后driver收不回来,还是同样的错误。

15/06/23 05:54:55 ERROR executor.Executor: Exception in task 1319.0 in stage 1.0 (TID 1458)
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
   at java.util.Arrays.copyOf(Arrays.java:2271)
   at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
   at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
   at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
   at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1870)
   at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1779)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1186)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
   at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
   at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:252)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:724)

对应的代码如下:
Executor:run():
  val accumUpdates = Accumulators.values
  val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
  val serializedDirectResult = ser.serialize(directResult)

无奈了。

此博客中的热门博文

少写代码,多读别人写的代码

在windows下使用llvm+clang

tensorflow distributed runtime初窥