よしだのブログ

サブタイトルはありません。

Spark / RDD のネストできない!

どうも!最近 Spark を触りたおしているよしだです。

先日、いつものように改修をしていたら、例外がでるようになってしまい、1日つぶしてしまったので皆様が同じ轍を踏むように、共有しようと思います(笑)

結論から言えば、Spark において、 RDD のネストはサポートされません。 具体的には、ある RDD を for ループや map などで処理しているブロックの中で、他のRDDは参照できません。参照しようとすると NullPointerException が出ます。なお、環境は、Apache Spark 1.2.0 です。

この例外ですが、一見すると、原因は参照しようとしているRDD中のデータが Null のように見えるのですが、もちろん実際には Null のデータが無くても発生しますので、トレースにかなり苦戦しました。一生懸命 println しても、発見できません。また、for の直前と、for に入ってすぐで比較すると、入ってすぐの場合に Exception が出るので、スコープの問題かのようにも見えます。 知っているかどうかが全てです!

以下、具体例です。

まず、参照しようとして NullPointerException を出している例です。sc.textFile で2つ RDD を作成し、一方を for ループして、その中でもう一方を参照しようとしています。

scala> val r1 = sc.textFile("s3n://abc-takumiyoshida/datasets/restaurants.csv")
r1: org.apache.spark.rdd.RDD[String] = s3n://abc-takumiyoshida/datasets/restaurants.csv MappedRDD[3] at textFile at <console>:12

scala> val r2 = sc.textFile("s3n://abc-takumiyoshida/datasets/ratings.csv")
r2: org.apache.spark.rdd.RDD[String] = s3n://abc-takumiyoshida/datasets/ratings.csv MappedRDD[5] at textFile at <console>:12

scala> for (r <- r1) {
     |
     | r2 take(5) foreach(println)
     | }

15/02/05 08:52:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1239)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
        at $line13.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18)
        at $line13.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:17)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765)

~以下略~

対策としては、以下のように collect を呼んでしまう、というのが一つの手になるかと思います。collect すれば、ただの Array になるので、その中で別の RDD を処理することも可能になります。

scala> val c1 = r1.collect
c1: Array[String] = Array(id,name,property,alphabet,name_kana,pref_id,area_id,station_id1,station_time1,station_distance1,station_id2,station_time2,station_distance2,station_id3,station_time3,station_distance3,category_id1,category_id2,category_id3,category_id4,category_id5,zip,address,north_latitude,east_longitude,description,purpose,open_morning,open_lunch,open_late,photo_count,special_count,menu_count,fan_count,access_count,created_on,modified_on,closed, 2,"ラ・マーレ・ド・茶屋","2F・3F","LA MAREE DE CHAYA","らまーれどちゃや",14,1013,2338,22,1789,2401,28,2240,2867,47,3755,201,0,0,0,0,240-0113,"三浦郡葉山町堀内24-3",35.16.53.566,139.34.20.129,"こちら2.3Fのレストランへのコメントになります。  『ラ・マーレ・ド・茶屋』1F(テラス&バー)へのコメントはそちらにお願いします。    駐車場15台(専用)    06/06/19 営業時間等更新(From東京グルメ)",,0,1,0,1,0,0,5,6535,"2000-09-10 11:22:02","2011-04-22 16...

scala> for (c <- c1) {
     | r2 take(5) foreach(println)
     | }
id,restaurant_id,user_id,total,food,service,atmosphere,cost_performance,title,body,purpose,created_on
156445,310595,ee02f26a,5,0,0,0,0,,"名前は忘れましたが、札幌で食べたお店よりも、全然こっちの方が美味しかったので、載せました。お店も綺麗(新規オープン・・)でランチは結構混んでいます。個人的にはゆったりと食事できるので夜の方がオススメです。  辛さが0倍から50倍まで選べるのもGOOD!、スープも2種類みたいで、友達は黄色がオススメと言っていましたが、自分は赤の方を食べました。かなり美味しかったです。店長も好感のもてるお兄さんでした。  駅近くなので一度お試しあれです!",0,"2006-10-07 05:06:09"

ただし、この方法の注意点は collect を呼ぶと、RDD のデータが driver に全て集約されるので、パフォーマンスが良くありません。データの分量を見比べて、少ない方を collect してループできれば、いい感じになるかと思います。

以上です!

Learning Spark: Lightning-fast Big Data Analysis

Learning Spark: Lightning-fast Big Data Analysis

  • 作者: Holden Karau,Andy Kowinski,Matei Zaharia,Patrick Wendell
  • 出版社/メーカー: Oreilly & Associates Inc
  • 発売日: 2015/02/22
  • メディア: ペーパーバック
  • この商品を含むブログを見る