hatunina’s blog

メモと日記です

jupyter notebookでpyspark

環境はMacOSです。
sparkは以下

>>> echo $JAVA_HOME
/Library/Java/JavaVirtualMachines/jdk1.8.0_122.jdk/Contents/Home

>>> pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/
                        
Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_122-ea


jupyter notebook起動時に$SPARK_HOME/python/pyspark/shell.pyを起動することでSparkContextを作成しnotebook上でpysparkを利用できるようになります。

brew install apache-sparkとかした場合はSPARK_HOMEが設定されていないのでテキトーに設定します。
バージョンは適宜変更してください。

echo export SPARK_HOME=/usr/local/Cellar/apache-spark/2.3.1/libexec >> ~/.bash_profile
source ~/.bash_profile


jupyter notebook起動時の設定ファイルを作成します。

mkdir -p ~/.ipython/profile_default/startup/
touch ~/.ipython/profile_default/startup/00-first.py


00-first.pyに以下を記載します。
すでに存在する場合は追記するか50-middle.pyとか作成するといいかもしれません。

# coding: UTF-8

import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
# py4j-0.10.7-src.zipのバージョンは適宜変更してください
sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.7-src.zip"))

exec(open(os.path.join(spark_home, "python/pyspark/shell.py")).read())


最後にjupyter notebookを起動しscとするとSparkContextが作成されていることが確認できます。

f:id:hatunina:20180722004839p:plain
やったぜ!!!

参考は以下書籍のP203 ~ P204です。
www.shuwasystem.co.jp

Macでspark-jobserverのサンプルを動かす&Tips

職場のWin10環境でハマりまくっているのでMacでサクッとやってみる。
基本的にレポジトリ周辺にまとめられているドキュメントを参照しただけです。

まずレポジトリをクローン github.com

docker

docker run -d -p 8090:8090 sparkjobserver/spark-jobserver:0.7.0.mesos-0.25.0.spark-1.6.2 


http://localhost:8090/にアクセスしてSpark Job Server UIが立ち上がっていることを確認
dockerについては詳しくはこちらへ
github.com

sbt

Scalaなので?sbtを使います。

brew install sbt
#クローンしたレポジトリへ移動
sbt job-server-tests/package
ls job-server-tests/target/scala-2.11/
# classes                      job-server-tests_2.11-0.8.1-SNAPSHOT.jar


packageコマンドでjob-server-testsのサンプルプログラムをjarにします。
初回は諸々ライブラリのインストールで時間がかかるっぽいです。

jobserverにjarをアップロード

作ったjarをアップロードします。
localhostやjarファイル名は環境によって書き換えます。

curl --data-binary @job-server-tests/target/scala-2.11/job-server-tests_2.11-0.8.1-SNAPSHOT.jar localhost:8090/jars/test

UIでjarsタブにjarが登録されたことを確認

サンプルを動かす

# Scalaサンプル
curl -d "input.string = a b c a b see" "localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample"

#{
#  "duration": "Job not done yet",
#  "classPath": "spark.jobserver.WordCountExample",
#  "startTime": "2018-07-11T13:06:57.151Z",
#  "context": "17178e37-spark.jobserver.WordCountExample",
#  "status": "STARTED",
#  "jobId": "b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f"
#}

curl localhost:8090/jobs/b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f

#{
#  "duration": "4.114 secs",
#  "classPath": "spark.jobserver.WordCountExample",
#  "startTime": "2018-07-11T13:06:57.151Z",
#  "context": "17178e37-spark.jobserver.WordCountExample",
#  "result": {
#    "a": 2,
#    "b": 2,
#    "see": 1,
#    "c": 1
#  },
#  "status": "FINISHED",
#  "jobId": "b9e2d9ed-dcec-44ed-8e6c-66cb5b55b76f"
#}


Tips

ScalaではなくJavaの場合は先にコンテキストを指定する必要がある

curl -d "" "localhost:8090/contexts/jcontext?context-factory=spark.jobserver.context.JavaSparkContextFactory"


・READMEのバージョンをよく見て、spark-jobserver, Spark, Scalaのバージョンをちゃんと合わせないと普通に動かなかったりする

・諸事情でsbtが使えない -> Mavenを使う -> Mavenレポジトリからspark-jobserverが取得できないよ!ってなる
JitPackを使うか直接jarを落として(http://dl.bintray.com/spark-jobserver/maven/)プロジェクトに追加&pomにこんな感じで追加する(このときバージョンミスると辛いことになる)

  <dependency>
    <groupId>spark-jobserver</groupId>
    <artifactId>job-server-api</artifactId>
    <scope>system</scope>
    <version>0.62</version>
    <systemPath>path/job-server-api_2.11-0.6.2-sources.jar</systemPath>
  </dependency>

Windows10でdocker-compose upがコケた時の対処法

メモです。

Win10環境でdocker-compose upすると下記エラー

Error response from daemon: Cannot restart container xxxxx: driver failed programming external connectivity on endpoint xxxxx 

Win10の高速スタートアップ機能の「完全シャットダウン」とやらのせいでコケている様子。

github.com

コントロールパネル -> ハードウェアとサウンド -> 電源オプション -> 電源ボタンの動作の選択 -> 現在利用可能ではない設定を変更します -> 「高速スタートアップを有効にする (推奨)」のチェックを外す

mechalog.com

これでイケる!!!

「前処理大全」を読みました

読みました。

gihyo.jp

ドッグイヤーしまくりました。 最近はデータ分析やらダッシュボード・プロトタイプ作成が主な業務になっているので、こういうデータ欲しいんだけどどう書くのが効率的かな〜って時に逆引きしてます。 特にSQLはちょっと苦手なので、目的に対してSQL, R, Pythonとコードが分けてあり理解が捗ります。 実務的な話がたくさん書いてあるのも良かったです。 あと表紙がかっこいいです。

pd.Seriesに辞書を渡すとキーで昇順にソートされる

Seriesを作る際にindexを指定した場合、指定したindexはリストなので順番を持っています。
なので、リストの順番でSeriesが作られます。

hoge = pd.Series([1, 2, 3, 4, 5], index=['C','A','B','E','D'])
print(hoge)

# 実行結果
C    1
A    2
B    3
E    4
D    5
dtype: int64


これに対し、辞書を渡した場合はどうなるでしょう。

huga = pd.Series({'C': 1, 'A': 2, 'B': 3, 'E': 4, 'D': 5})
print(huga)

# 実行結果
A    2
B    3
C    1
D    5
E    4
dtype: int64


辞書は順番を持たないので、出力はてきと〜になるかと思いきや、渡した辞書のキーで昇順になっています。
うーむ、なんか直感に反するぞ。。。

調査

ということで調査です。
しかし、ドキュメントを読んでみてもそれらしき記述は見つからず。
ググっても検索ワードが思いつかずそれらしき情報も見当たらず。
なので、pyCharmでライブラリをデバッグしてみることに。

すると、Seriesクラスのコンストラクタにこんなコードが!

            elif isinstance(data, dict):
                if index is None:
                    if isinstance(data, OrderedDict):
                        index = Index(data)
                    else:
                        index = Index(_try_sort(data))


ここで、dataというのはSeriesに渡された変数です。
要するに、pd.Series(data)とした時に辞書が渡されているかつindexの指定がない場合は_try_sortとやらを実行するとのこと。
OrderedDictは順番が保持されているのでそれをそのまま使うみたいですね。

で、_try_sortを見ると、、、

def _try_sort(iterable):
    listed = list(iterable)
    try:
        return sorted(listed)
    except Exception:
        return listed


ソートしてる〜
これやんけ〜

ということで、pd.Seriesに辞書を渡すとキーで昇順にソートされる、というお話でした。