Thursday, July 9, 2015

In place update of Spark RDD

RDD in Spark are immutable. When you need to change the RDD, you produce a new RDD. When you need to change all the data in RDD frequently, e.g. when running some iterative algorithm, you might not want to spend time and memory on creation of the new structure. However, when you cache the RDD, you can get the reference to the data inside and change it. You need to make sure that the RDD stays in memory all the time. The following is a hack around RDD immutability and is not recommended to do. Also, you fault tolerance is lost, though you can force check-pointing.
// class that allows modification of its variable with inc function
class Counter extends Serializable { var i: Int = 0; def inc: Unit = i += 1 }
// Approach 1: create an RDD with 5 instances of this class
val rdd = sc.parallelize(1 to 5, 5).map(x => new Counter())
// trying to apply modification
rdd.foreach(x => x.inc)
// modification did not work: all values are still zeroes
rdd.collect.foreach(x => println(x.i))
// Approach 2: create a cached RDD with 5 instances of the Counter class
val cachedRdd = sc.parallelize(1 to 5, 5).map(x => new Counter()).cache
// trying to apply modification
cachedRdd.foreach(x => x.inc)
// modification worked: all values are ones
cachedRdd.collect.foreach(x => println(x.i))

1 comment:

  1. This works! Thanks~ But one thing still annoys me: I also tried to persist with MEMORY_AND_DISK_SER, but the update fails. Perhaps it's because driver code is unable to update serialized RDDs?

    ReplyDelete