apache sparkでファイル名を指定して保存する
メモです!
stackoverflowまとめただけです!
以下バージョンです。
>> pyspark --version SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_171
概要
例えば、こんな感じでデータセットを保存すると。。。
df.coalesce(1).write.json('./data/hoge/', mode='overwrite')
ディレクトリ名までしか指定できない。
$ tree . └── hoge ├── _SUCCESS └── part-00000-0f5a725e-c34d-4c59-9627-debe270aa558-c000.json
そもそもデータでかいんだから一つにまとめんなって話でディレクトリ単位がデータの区分になっていることを想定しているからっぽい。
それでもファイル名を指定したい!っていう時の対処法です。
PySpark
普通に保存してからHadoop APIを使ってリネーム&デリートしてあげる。
df.coalesce(1).write().json("./data/hoge", "overwrite"); from py4j.java_gateway import java_import java_import(spark._jvm, "org.apache.hadoop.fs.Path"); fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()) file = fs.globStatus(sc._jvm.Path('./data/hoge/part*'))[0].getPath().getName() fs.rename(sc._jvm.Path('./data/hoge/' + file), sc._jvm.Path('./data/output/output.json')) fs.delete(sc._jvm.Path('./data/hoge/'), True)
Java
PySparkと同じ。
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; dataset.coalesce(1).write() .mode("overwrite") .format("json") .save("./data/hoge"); try { FileSystem fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext().hadoopConfiguration()); String oldFileName = fs.globStatus(new Path("./data/hoge"+ "/part-*"))[0].getPath().getName(); fs.rename(new Path("./data/hoge", oldFileName), new Path("./data/output/output.json")); fs.delete(new Path("./data/hoge"), true); } catch (IOException e) { e.printStackTrace(); }
Scala
PySparkとJavaと同じ。
Scala知らないのでこれ見てください。
stackoverflow.com