AWS Step FunctionsでTaskにParametersを設定してAWS Lambdaで読み込む
Parameters周りの話があまり見当たらなかったのでまとめました。
やりたいこと
AWS Step FunctionsとAWS LambdaでETLっぽいことをしたい!
初期パラメータはPassで渡し後ろのTaskでもパラメータを渡したい!
想定
下図のようなステートマシンを想定します。
select_task
でDB(RDS)からデータをセレクトしそれをinsert_task
でインサートします。
このとき、select_task
ではParamsSettings
で設定した接続情報を使い、insert_task
ではParameters
で設定した接続情報を使うことにします。
ステートマシンの定義
定義は下記のようになります。
{ "Comment": "ETLフロー", "StartAt": "ParamsSetting", "States": { "ParamsSetting": { "Type": "Pass", "Result": { "rds_host": "host_1", "user_name": "user_1", "password": "password_1", "db_name": "db_1" }, "ResultPath": "$.initial_params", "Next": "select_task" }, "select_task": { "Type": "Task", "Resource": "Lambdaのリソース_1", "InputPath": "$", "ResultPath": "$.select_result", "OutputPath": "$", "Next": "insert_task" }, "insert_task": { "Type": "Task", "Resource": "Lambdaのリソース_2", "InputPath": "$", "ResultPath": "$", "Parameters": { "rds_host": "host_2", "user_name": "user_2", "password": "password_2", "db_name": "db_2", "select_result.$": "$.select_result" }, "End": true } } }
DB接続城はキーバリュー形式でそのまま書くことができますが、select_task
の結果を参照するためには$
を用います。
"select_result.$": "$.select_result"
という部分ですが、キー部分にも$
を含めないとバリューが変数として見なされず文字列としてTaskへ渡されてしまうので要注意です。
AWS Lambda関数
ステートマシンで定義した二つの関数です。
# select_task def lambda_handler(event, context): rds_host = event['initial_params']['rds_host'] user_name = event['initial_params']['user_name'] password = event['initial_params']['password'] db_name = event['initial_params']['db_name'] # DBからデータをSELECTする処理 # 結果をselect_dataへ格納 response = { 'select_data': select_data }
# insert_task def lambda_handler(event, context): rds_host = event['rds_host'] user_name = event['user_name'] password = event['password'] db_name = event['db_name'] select_data = event['select_result']['select_data'] # DBへINSERT処理
Parameters
で設定した値はeventに格納されておりそのままのキーで読み込むことができます。
またevent['select_result']['select_data']
でステートマシンで定義したキーで前のTaskの結果を読み込むことができます。
まとめ
Parameters
で新しく設定する値自体は単純に読み込むことが可能ですが、前のTaskの結果を参照する場合には$
を使う必要があるので要注意です。