このブログを検索

2015/11/02

WebHDFSのFile Uploadを1回のRequestで実行するには

curlの -Lを使って自動でリダイレクトさせる。

curl -i -X PUT -T /data/file.csv.tar.bz2 -L "http://hostname:50070/webhdfs/v1/path.to.upload/dt=20140726/test.tar.gz?op=CREATE&overwrite=true"

2015/08/10

java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package

15/08/07 18:13:15 ERROR SparkContext: Error initializing SparkContext.
java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package
at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)
at java.lang.ClassLoader.defineClass(ClassLoader.java:794)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

IntelliJ+SBT + scalatestでSparkのUnit Testするときに上のエラーが出る場合がある。
解決策:
  1. IntelliJ File→Project Structure→Librariesへ遷移。
  2. javax.servletとorg.mortbay.jettyを削除する。

参照:Spark application throws javax.servlet.FilterRegistrationによるとorg.eclipse.jettyとorg.mortbay.jettyがConflictするらしい。

依存関係
  • spark : org.eclipse.jetty
  • hadoop-common: javas.servlet
  • hbase : org.mortbay.jetty
ですが、自分の場合は次のようにhadoop-commonもHBase ない何処かの依存として読み込まれた。
libraryDependencies ++= Seq(
  "org.scalatest" %% "scalatest" % "2.2.1" % "test",
  "org.apache.spark" %% "spark-core" % "1.4.1" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.4.1" % "provided",
  "org.apache.spark" %% "spark-hive" % "1.4.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.4.1" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.4.1",
  "org.scalanlp" %% "breeze" % "0.11.2" % "provided",
  "org.scalanlp" %% "breeze-natives" % "0.11.2" % "provided",
  "org.apache.httpcomponents" % "httpclient" % "4.3.6",
  "javax.mail" % "mail" % "1.4",
  "net.liftweb" %% "lift-json" % "2.6",
  "net.liftweb" %% "lift-json-ext" % "2.6",
  "org.apache.hive" % "hive-jdbc" % "0.13.0",
  "com.facebook.presto" % "presto-jdbc" % "0.86")

追記:http://d.hatena.ne.jp/Kazuhira/20130320/1363791795 を参考にして依存関係を調べてみたが、それでも不明。

2015/08/05

Scala : stableSort の結果をMap変換すると [D cannot be cast to scala.collection.Seq が出る。

Scalaの素人が苦戦中に見つけたScalaの癖(?)
”D@ 6ef3e973”のような謎のObjectが生成されて
java.lang.ClassCastException: [D cannot be cast to scala.collection.Seqが発生する。

原因は分からないが、Tuple[Double]=Tuple[ (1.1), (1.2),(1.3),(2.4) ]の状態から直接Mapを作るとこうなる。
参考にIntだと”D”→"I"になる。

以下のような場合発生する。
scala>  val key = "key"
val ts = Array ( (3, 1.3 ), (2,1.2),(1, 1.1), (4, 2.4) )
scala.util.Sorting.stableSort(ts, (e1: (Int, Double), e2: (Int, Double) ) => e1._1 < e2._1)
val rdd = ( "test", (key -> ts.map(_._2)) )
rdd match {
  case (key, ts: Map[String, Seq[Double]])  => {
    println(key)
    ts.map( x => println(x._1 + "/" + x._2))
    }
}
解決は
val rdd = ( "test", (key -> ts.map(_._2).toSeq) )
のようにするとWrappedArray(1.1, 1.2, 1.3, 2.4)のようになりArrayの中にDouble Objectが入れる。

とりあえずここまで

2015/06/29

SQLContext JDBC と rdd.JdbcRDDの使い分け

SparkからJDBC経由でデータロードするときの選択肢は「SQLContext」と「JdbcRDD」の2つがあります。

SQLContextが推薦されてるようで使い方ももっと簡単で使い勝手も良かったですが、それでもJdbcRDDの方が良い場合がありました。

次のSQLContextのJDBCの例ではdbtableにselect文も指定できます。
https://github.com/sujee81/SparkApps/blob/master/spark-load-from-db/src/main/java/com/sparkexpert/Main.java

ですが「(select emp_no, concat_ws(' ', first_name, last_name) as full_name from employees) as employees_name」のような少し微妙なSQLになっています。

その理由はsql.jdbc.JDBCRDD#resolveTableの"SELECT * FROM $table WHERE 1=0"でMeta情報を取得してるからです。

この場合、dbtableのSQLにGroup byなどは指定出来ません。
しかし「JdbcRDD」は自由にSQLがかけるのでGroup byがかけます。

そもそもSQLでではなくsparkのdataframeのGroupbyを使うべきでしょうという指摘もあるでしょう。

データロードの処理を共通化したり、Prestoなどから集計対象データを絞ったり、整形したりが効率や可読性が良いケースもあると思います。

最後に私の場合は、
1つのデータロード用のオブジェクトにSQLを渡して処理してますが、様々なテーブルのデータ整形をSQLで行った方がわかりやすいと感じてます。
例えば、Aテーブルの場合は Group byが必要、Bテーブルの場合はJOINが必要なケースも柔軟にかけるからです。

2015/03/18

Spark’s Shared Variables

MRベースではMR起動時に変数定義と値を設定してするが、Sparkでは共有変数機能を提供してある。

Broadcast Variables:

  •  読み取り専用の値を全体に拡散させて他のノードから参照出来る。
  • SparkContext.broadcast(v)で生成。


Accumulators:

  • 追加(added)のみ可能は変数で MRのCounterまたはSumとして利用できる。
  • SparkContext.accumulator(v)で生成出来る。
  • 各ノードではaddまたは+=のオペレーターで足し算ができる。
  • AccumulatorParamのサブクラスとして独自の型を生成出来る。
  • この変数のupdateはActions(reduce, collect, countなど)の処理の中でのみ実行される。Sparkは各Taskで1回のみ実行されることを保証する。

programming-guide