hatunina’s blog

メモと日記です

Macでspark-jobserverのサンプルを動かす&Tips

職場のWin10環境でハマりまくっているのでMacでサクッとやってみる。
基本的にレポジトリ周辺にまとめられているドキュメントを参照しただけです。

まずレポジトリをクローン github.com

docker

docker run -d -p 8090:8090 sparkjobserver/spark-jobserver:0.7.0.mesos-0.25.0.spark-1.6.2 


http://localhost:8090/にアクセスしてSpark Job Server UIが立ち上がっていることを確認
dockerについては詳しくはこちらへ
github.com

sbt

Scalaなので?sbtを使います。

brew install sbt
#クローンしたレポジトリへ移動
sbt job-server-tests/package
ls job-server-tests/target/scala-2.11/
# classes                      job-server-tests_2.11-0.8.1-SNAPSHOT.jar


packageコマンドでjob-server-testsのサンプルプログラムをjarにします。
初回は諸々ライブラリのインストールで時間がかかるっぽいです。

jobserverにjarをアップロード

作ったjarをアップロードします。
localhostやjarファイル名は環境によって書き換えます。

curl --data-binary @job-server-tests/target/scala-2.11/job-server-tests_2.11-0.8.1-SNAPSHOT.jar localhost:8090/jars/test

UIでjarsタブにjarが登録されたことを確認

サンプルを動かす

# Scalaサンプル
curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample"

#{
#  "duration": "Job not done yet",
#  "classPath": "spark.jobserver.WordCountExample",
#  "startTime": "2018-07-11T13:06:57.151Z",
#  "context": "17178e37-spark.jobserver.WordCountExample",
#  "status": "STARTED",
#  "jobId": "b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f"
#}

curl localhost:8090/jobs/b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f

#{
#  "duration": "4.114 secs",
#  "classPath": "spark.jobserver.WordCountExample",
#  "startTime": "2018-07-11T13:06:57.151Z",
#  "context": "17178e37-spark.jobserver.WordCountExample",
#  "result": {
#    "a": 2,
#    "b": 2,
#    "see": 1,
#    "c": 1
#  },
#  "status": "FINISHED",
#  "jobId": "b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f"
#}


Tips

ScalaではなくJavaの場合は先にコンテキストを指定する必要がある

curl -d "" "localhost:8090/contexts/jcontext?context-factory=spark.jobserver.context.JavaSparkContextFactory"


・READMEのバージョンをよく見て、spark-jobserver, Spark, Scalaのバージョンをちゃんと合わせないと普通に動かなかったりする

・諸事情でsbtが使えない -> Mavenを使う -> Mavenレポジトリからspark-jobserverが取得できないよ!ってなる
JitPackを使うか直接jarを落として(http://dl.bintray.com/spark-jobserver/maven/)プロジェクトに追加&pomにこんな感じで追加する(このときバージョンミスると辛いことになる)

  <dependency>
    <groupId>spark-jobserver</groupId>
    <artifactId>job-server-api</artifactId>
    <scope>system</scope>
    <version>0.62</version>
    <systemPath>path/job-server-api_2.11-0.6.2-sources.jar</systemPath>
  </dependency>

Windows10でdocker-compose upがコケた時の対処法

メモです。

Win10環境でdocker-compose upすると下記エラー

Error response from daemon: Cannot restart container xxxxx: driver failed programming external connectivity on endpoint xxxxx 

Win10の高速スタートアップ機能の「完全シャットダウン」とやらのせいでコケている様子。

github.com

コントロールパネル -> ハードウェアとサウンド -> 電源オプション -> 電源ボタンの動作の選択 -> 現在利用可能ではない設定を変更します -> 「高速スタートアップを有効にする (推奨)」のチェックを外す

mechalog.com

これでイケる!!!

「前処理大全」を読みました

読みました。

gihyo.jp

ドッグイヤーしまくりました。 最近はデータ分析やらダッシュボード・プロトタイプ作成が主な業務になっているので、こういうデータ欲しいんだけどどう書くのが効率的かな〜って時に逆引きしてます。 特にSQLはちょっと苦手なので、目的に対してSQL, R, Pythonとコードが分けてあり理解が捗ります。 実務的な話がたくさん書いてあるのも良かったです。 あと表紙がかっこいいです。

KFoldでクロスバリデーション

メモです。

サンプル

とりあえずndarrayを定義

import numpy as np

X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]])
y = np.array([1, 2, 3, 4])

print(X)
print('=========')
print(y)

# [[1 2]
# [3 4]
# [5 6]
# [7 8]]
# =========
# [1 2 3 4]


n_splitsで区切る数を指定する。デフォルトは3とのこと。
他にもshuffle, random_stateが指定できる。
split関数で対象のデータセットを渡すとindexndarrayで返ってくるのでそれを使ってデータセットを分けていく。

from sklearn.model_selection import KFold

kf = KFold(n_splits=3)

for train_index, test_index in kf.split(X):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]
    
    print('--- X_train ---')
    print(X_train)
    print('--- X_test ---')
    print(X_test)
    print('--- y_train ---')
    print(y_train)
    print('--- y_test ---')
    print(y_test)
    print('=== next split ===‘)

# --- X_train ---
# [[5 6]
# [7 8]]
# --- X_test ---
# [[1 2]
# [3 4]]
# --- y_train ---
# [3 4]
# --- y_test ---
# [1 2]
# === next split ===
# --- X_train ---
# [[1 2]
# [3 4]
# [7 8]]
# --- X_test ---
# [[5 6]]
# --- y_train ---
# [1 2 4]
# --- y_test ---
# [3]
# === next split ===
# --- X_train ---
# [[1 2]
# [3 4]
# [5 6]]
# --- X_test ---
# [[7 8]]
# --- y_train ---
# [1 2 3]
# --- y_test ---
# [4]
# === next split ===


参考

sklearn.model_selection.KFold — scikit-learn 0.19.1 documentation

pd.factorizeでカテゴリ変数を数値に変換する

カテゴリ変数をユニークな数値に変換してくるやつです(ダミー変数ではない)。

pd.factorize

まず、こんなデータフレームがあったとします。

import pandas as pd
df = pd.DataFrame({'列1': ['b', 'b', 'a', 'c', 'b'], '列2': ['あ', 'い', 'い', None, 'え']})
print(df)

# 出力
#   列1    列2
# 0  b     あ
# 1  b     い
# 2  a     い
# 3  c  None
# 4  b     え


とにかく文字列を数値に変換したい。
そんな時はpd.factorizeがなんとかしてくれる。

for column in df.columns:
    labels, uniques = pd.factorize(df[column])
    df[column] = labels
    
print(df)

# 出力
#    列1  列2
# 0   0   0
# 1   0   1
# 2   1   1
# 3   2  -1
# 4   0   2


引数にはシーケンス(list、Series)を渡します。
すると、カテゴリ変数を数値に変換したndarrayが返ってくるのでそれを元のデータフレームに戻します。
この時、Noneは-1に変換されます。
labelsuniquesの出力は以下のようになっています。

for column in df.columns:
    labels, uniques = pd.factorize(df[column])
    df[column] = labels
    print(labels)
    print(uniques)
    print('===')

# 出力
# [0 0 1 2 0]
# Index(['b', 'a', 'c'], dtype='object')
# ===
# [0 1 1 2 3]
# Index(['あ', 'い', 'え'], dtype='object')
# ===

get_indexer

uniquesにユニークなラベルが入っているわけですが、これをget_indexerを使うことで別のデータフレームのカテゴリ変数を同じ数値で変換することができます。

新しいデータフレームを定義します。

import pandas as pd
df = pd.DataFrame({'列1': ['b', 'b', 'a', 'c', 'b'], '列2': ['あ', 'い', 'い', None, 'え']})
df2 = pd.DataFrame({'列1': ['a', 'a', 'a', 'c', 'b'], '列2': ['え', 'あ', 'い', 'お', None]})
print(df)
print('\n')
print(df2)

# 出力
#   列1    列2
# 0  b     あ
# 1  b     い
# 2  a     い
# 3  c  None
# 4  b     え

#   列1    列2
# 0  a     え
# 1  a     あ
# 2  a     い
# 3  c     お
# 4  b  None


get_indexerに新しいデータフレームの列を渡して、変換します。

for column in df.columns:
    labels, uniques = pd.factorize(df[column])
    df[column] = labels
    df2_labels = uniques.get_indexer(df2[column])
    df2[column] = df2_labels
    
print(df)
print('\n')
print(df2)

# 出力
#    列1  列2
# 0   0   0
# 1   0   1
# 2   1   1
# 3   2  -1
# 4   0   2

#    列1  列2
# 0   1   2
# 1   1   0
# 2   1   1
# 3   2  -1
# 4   0  -1


元のデータフレームに存在しないカテゴリ変数はNoneと同様に-1に変換されます。

トレーニングセットで行った変換をテストセットにも適用する際に便利ですね。

参考

pandas.factorize — pandas 0.23.0 documentation

pandas.Index.get_indexer — pandas 0.23.0 documentation