hatunina’s blog

メモと日記です

apache sparkでファイル名を指定して保存する

メモです!
stackoverflowまとめただけです!

以下バージョンです。

>> pyspark --version
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
                        
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_171

概要

例えば、こんな感じでデータセットを保存すると。。。

df.coalesce(1).write.json('./data/hoge/', mode='overwrite')


ディレクトリ名までしか指定できない。

$ tree
.
└── hoge
    ├── _SUCCESS
    └── part-00000-0f5a725e-c34d-4c59-9627-debe270aa558-c000.json


そもそもデータでかいんだから一つにまとめんなって話でディレクトリ単位がデータの区分になっていることを想定しているからっぽい。
それでもファイル名を指定したい!っていう時の対処法です。

PySpark

普通に保存してからHadoop APIを使ってリネーム&デリートしてあげる。

df.coalesce(1).write().json("./data/hoge", "overwrite");

from py4j.java_gateway import java_import
java_import(spark._jvm, "org.apache.hadoop.fs.Path");

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
file = fs.globStatus(sc._jvm.Path('./data/hoge/part*'))[0].getPath().getName()
fs.rename(sc._jvm.Path('./data/hoge/' + file), sc._jvm.Path('./data/output/output.json'))
fs.delete(sc._jvm.Path('./data/hoge/'), True)

Java

PySparkと同じ。

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

dataset.coalesce(1).write()
       .mode("overwrite")
       .format("json")
       .save("./data/hoge");

try {
    FileSystem fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext().hadoopConfiguration());
    String oldFileName = fs.globStatus(new Path("./data/hoge"+ "/part-*"))[0].getPath().getName();

    fs.rename(new Path("./data/hoge", oldFileName), new Path("./data/output/output.json"));
    fs.delete(new Path("./data/hoge"), true);
} catch (IOException e) {
    e.printStackTrace();
}

Scala

PySparkとJavaと同じ。
Scala知らないのでこれ見てください。

stackoverflow.com

参考

stackoverflow.com

FileSystem (Apache Hadoop Main 2.7.3 API)