apache sparkでファイル名を指定して保存する
メモです!
stackoverflowまとめただけです!
以下バージョンです。
>> pyspark --version SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_171
概要
例えば、こんな感じでデータセットを保存すると。。。
df.coalesce(1).write.json('./data/hoge/', mode='overwrite')
ディレクトリ名までしか指定できない。
$ tree . └── hoge ├── _SUCCESS └── part-00000-0f5a725e-c34d-4c59-9627-debe270aa558-c000.json
そもそもデータでかいんだから一つにまとめんなって話でディレクトリ単位がデータの区分になっていることを想定しているからっぽい。
それでもファイル名を指定したい!っていう時の対処法です。
PySpark
普通に保存してからHadoop APIを使ってリネーム&デリートしてあげる。
df.coalesce(1).write().json("./data/hoge", "overwrite"); from py4j.java_gateway import java_import java_import(spark._jvm, "org.apache.hadoop.fs.Path"); fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) file = fs.globStatus(sc._jvm.Path('./data/hoge/part*'))[0].getPath().getName() fs.rename(sc._jvm.Path('./data/hoge/' + file), sc._jvm.Path('./data/output/output.json')) fs.delete(sc._jvm.Path('./data/hoge/'), True)
Java
PySparkと同じ。
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; dataset.coalesce(1).write() .mode("overwrite") .format("json") .save("./data/hoge"); try { FileSystem fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext().hadoopConfiguration()); String oldFileName = fs.globStatus(new Path("./data/hoge"+ "/part-*"))[0].getPath().getName(); fs.rename(new Path("./data/hoge", oldFileName), new Path("./data/output/output.json")); fs.delete(new Path("./data/hoge"), true); } catch (IOException e) { e.printStackTrace(); }
Scala
PySparkとJavaと同じ。
Scala知らないのでこれ見てください。
stackoverflow.com
参考
apache sparkでInvalid maximum heap sizeって言われた時の対処
結論から言うとJDKが32bitのせいなので64bitをインストール&環境変数の設定をしましょう。
結論にたどり着くまでに色々ハマったので以下メモ(微妙な情報が混ぜってるかも)
sparkの処理中にjava.io.IOException: ディスクに十分な空き領域がありません。
というエラーが出たので--driver-memory
を指定するとこんな感じのエラーが発生
>>> spark-submit --driver-memory 4g my_app.py Invalid maximum heap size: -Xmx4g The specified size exceeds the maximum representable size. Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
ググってみるとJDKが32bitだとヒープ領域として1.5Gしか確保できないので64bitを入れろとのこと。
確認のためjava --version
>>> java --version java 10.0.1 2018-04-17 Java(TM) SE Runtime Environment 18.3 (build 10.0.1+10) Java HotSpot(TM) 64-Bit Server VM 18.3 (build 10.0.1+10, mixed mode)
ちゃんと64bitになっている。。。
てきとーにHelloWorldを作ってヒープ領域の設定が効くか確認
>>> java -Xms4G -Xmx4G HelloWorld HelloWorld
動くやんけ。。。
でも、環境変数にはJava1.8を設定しているので、そもそもjava --verison
でJava10が表示されることがおかしいと気づく。
複数のバージョンのJDKがインストールされていて環境変数がごっちゃになっているのかも?
ググるとこんな記事が。
web.plus-idea.net
上記の通りにjava.exe周辺のリネームと余分な環境変数が設定されていたのでそれを削除
そしてもう一度確認
>>> java -version java version "1.8.0_171" Java(TM) SE Runtime Environment (build 1.8.0_171-b11) Java HotSpot(TM) Client VM (build 25.171-b11, mixed mode) >>> java -Xms4G -Xmx4G HelloWorld Invalid initial heap size: -Xms4G The specified size exceeds the maximum representable size. Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
すると、使用されるJavaは1.8になったけどヒープ領域の指定はできず。
というよりインストーラーを見るとまんまと32bitをインストールしていた。
1.8の64bitを再度インストールし各コマンド実行
>>> java -version java version "1.8.0_181" Java(TM) SE Runtime Environment (build 1.8.0_181-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode) >>> java -Xms4G -Xmx4G HelloWorld Hello World! >>> spark-submit --driver-memory 4g my_app.py
動いた!
よくわからん状況だけどまとめると
・JDKたくさんインストールして環境変数も複数設定されていた
・$JAVA_HOMEはJava10, 64bitを見ていたのでjavaコマンドでのヒープ領域指定はうまくいくが、spark-submit
ではうまくいかない
・spark-submit
ではJava1.8, 32bitを見ていた?
・JDK, 環境変数を整理してJava1.8, 64bitを設定することで解決
apache sparkでStackOverflowErrorとOutOfMemoryErrorが発生した時の対処
以下バージョンです。
dockerコンテナ内で起動しています。
>> pyspark --version SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_171
以下本題です。
$SPARK_HOME/bin/spark-submit your_app.py
とかした時、
java.lang.StackOverflowError
ってなったらこう!
$SPARK_HOME/bin/spark-submit --driver-java-options -Xss10m your_app.py
もし、
java.lang.OutOfMemoryError: Java heap space
ってなったらさらにこう!
$SPARK_HOME/bin/spark-submit --driver-java-options -Xss10m --driver-memory 10g your_app.py
10mとか10gとかはてきとーに変えてください。
スタック領域が足りないよ!というエラーですが発生時に特にネストした処理は書いてないんですよね。。。
心当たりがあるとしたらFPGrowth
をminSupport
低めでアソシエーションルールを計算していたことぐらい。
jupyter notebookでpyspark
環境はMacOSです。
sparkは以下
>>> echo $JAVA_HOME /Library/Java/JavaVirtualMachines/jdk1.8.0_122.jdk/Contents/Home >>> pyspark --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_122-ea
jupyter notebook起動時に$SPARK_HOME/python/pyspark/shell.py
を起動することでSparkContextを作成しnotebook上でpysparkを利用できるようになります。
brew install apache-spark
とかした場合はSPARK_HOMEが設定されていないのでテキトーに設定します。
バージョンは適宜変更してください。
echo export SPARK_HOME=/usr/local/Cellar/apache-spark/2.3.1/libexec >> ~/.bash_profile source ~/.bash_profile
jupyter notebook起動時の設定ファイルを作成します。
mkdir -p ~/.ipython/profile_default/startup/ touch ~/.ipython/profile_default/startup/00-first.py
00-first.py
に以下を記載します。
すでに存在する場合は追記するか50-middle.py
とか作成するといいかもしれません。
# coding: UTF-8 import os import sys spark_home = os.environ.get('SPARK_HOME', None) sys.path.insert(0, spark_home + "/python") # py4j-0.10.7-src.zipのバージョンは適宜変更してください sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.7-src.zip")) exec(open(os.path.join(spark_home, "python/pyspark/shell.py")).read())
最後にjupyter notebookを起動しsc
とするとSparkContextが作成されていることが確認できます。
やったぜ!!!
参考は以下書籍のP203 ~ P204です。
www.shuwasystem.co.jp
Macでspark-jobserverのサンプルを動かす&Tips
職場のWin10環境でハマりまくっているのでMacでサクッとやってみる。
基本的にレポジトリ周辺にまとめられているドキュメントを参照しただけです。
まずレポジトリをクローン
github.com
docker
docker run -d -p 8090:8090 sparkjobserver/spark-jobserver:0.7.0.mesos-0.25.0.spark-1.6.2
http://localhost:8090/
にアクセスしてSpark Job Server UIが立ち上がっていることを確認
dockerについては詳しくはこちらへ
github.com
sbt
Scalaなので?sbtを使います。
brew install sbt #クローンしたレポジトリへ移動 sbt job-server-tests/package ls job-server-tests/target/scala-2.11/ # classes job-server-tests_2.11-0.8.1-SNAPSHOT.jar
packageコマンドでjob-server-tests
のサンプルプログラムをjarにします。
初回は諸々ライブラリのインストールで時間がかかるっぽいです。
jobserverにjarをアップロード
作ったjarをアップロードします。
localhostやjarファイル名は環境によって書き換えます。
curl --data-binary @job-server-tests/target/scala-2.11/job-server-tests_2.11-0.8.1-SNAPSHOT.jar localhost:8090/jars/test
UIでjarsタブにjarが登録されたことを確認
サンプルを動かす
# Scalaサンプル curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample" #{ # "duration": "Job not done yet", # "classPath": "spark.jobserver.WordCountExample", # "startTime": "2018-07-11T13:06:57.151Z", # "context": "17178e37-spark.jobserver.WordCountExample", # "status": "STARTED", # "jobId": "b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f" #} curl localhost:8090/jobs/b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f #{ # "duration": "4.114 secs", # "classPath": "spark.jobserver.WordCountExample", # "startTime": "2018-07-11T13:06:57.151Z", # "context": "17178e37-spark.jobserver.WordCountExample", # "result": { # "a": 2, # "b": 2, # "see": 1, # "c": 1 # }, # "status": "FINISHED", # "jobId": "b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f" #}
Tips
・ScalaではなくJavaの場合は先にコンテキストを指定する必要がある
curl -d "" "localhost:8090/contexts/jcontext?context-factory=spark.jobserver.context.JavaSparkContextFactory"
・READMEのバージョンをよく見て、spark-jobserver, Spark, Scalaのバージョンをちゃんと合わせないと普通に動かなかったりする
・諸事情でsbtが使えない -> Mavenを使う -> Mavenレポジトリからspark-jobserverが取得できないよ!ってなる
JitPackを使うか直接jarを落として(http://dl.bintray.com/spark-jobserver/maven/)プロジェクトに追加&pomにこんな感じで追加する(このときバージョンミスると辛いことになる)
<dependency> <groupId>spark-jobserver</groupId> <artifactId>job-server-api</artifactId> <scope>system</scope> <version>0.62</version> <systemPath>path/job-server-api_2.11-0.6.2-sources.jar</systemPath> </dependency>