Wednesday, April 29, 2015

Apache Spark Dataframe SQL vs RDD functions

Dataframe is a wrapper for RDD in Spark that can wrap RDD of case classes. One can run SQL queries with Dataframe, so it's convenient. Eventually, SQL should be translated into RDD functions. However, there are some differences. Lets sorting of 2 billion records, i.e. RDD "sortBy" vs Dataframe SQL "order by":

Create 2B rows of MyRecord within 2000 partitions, so each partition will have 1M of rows. (We should not have less partitions because then the number of rows per partition will be problematic for sorting on commodity node.)
import sqlContext.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
case class MyRecord(time: Double, id: String)
val rdd = sc.parallelize(1 to 200, 200).flatMap(x => 
Seq.fill(10000000)(MyRecord(util.Random.nextDouble, "xxx")))
Lets sort this RDD by time:
val sorted = rdd.sortBy(x => x.time)
result.count
It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. You can also check tasks that were completed in Spark web UI. The number of reducers was equal to the number of partitions, i.e. 2000

Lets convert the original RDD to Dataframe and sort again:
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("data")
val result = sqlContext.sql("select * from data order by time")
result.count
It will run for a while and then crash. If you check tasks in the Spark Web UI, you will see that some of them were cancelled due to lost executors (ExecutorLost) due to some strange Exceptions. It is really hard to trace back which executor was first to be lost. The other follow it as in house of cards. What's the problem? The number of reducers. For the first task it is equal to the number of partitions, i.e. 2000, but for the second it switched to 200. Why? It is the default value of
spark.sql.shuffle.partitions
So it is better to set an appropriate value before running SQL queries in Spark.

2 comments:

  1. Thanks for the post. I believe this code
    sc.parallelize(1 to 200, 200)

    should be changed to
    sc.parallelize(1 to 200, 2000)

    ReplyDelete
  2. Big data in hadoop is the interesting topic and to get some important information.Big data hadoop online Training Bangalore

    ReplyDelete