hatunina’s blog

メモと日記です

AWS Step FunctionsでTaskにParametersを設定してAWS Lambdaで読み込む

Parameters周りの話があまり見当たらなかったのでまとめました。

やりたいこと

AWS Step FunctionsとAWS LambdaでETLっぽいことをしたい!
初期パラメータはPassで渡し後ろのTaskでもパラメータを渡したい!

想定

下図のようなステートマシンを想定します。 select_taskでDB(RDS)からデータをセレクトしそれをinsert_taskでインサートします。
このとき、select_taskではParamsSettingsで設定した接続情報を使い、insert_taskではParametersで設定した接続情報を使うことにします。 f:id:hatunina:20190116215747p:plain

ステートマシンの定義

定義は下記のようになります。

{
  "Comment": "ETLフロー",
  "StartAt": "ParamsSetting",
  "States": {
    "ParamsSetting": {
      "Type": "Pass",
      "Result": {
        "rds_host": "host_1",
        "user_name": "user_1",
        "password": "password_1",
        "db_name": "db_1"
      },
      "ResultPath": "$.initial_params",
      "Next": "select_task"
    },
    "select_task": {
      "Type": "Task",
      "Resource": "Lambdaのリソース_1",
      "InputPath": "$",
      "ResultPath": "$.select_result",
      "OutputPath": "$",
      "Next": "insert_task"
    },
    "insert_task": {
      "Type": "Task",
      "Resource": "Lambdaのリソース_2",
      "InputPath": "$",
      "ResultPath": "$",
      "Parameters": {
        "rds_host": "host_2",
        "user_name": "user_2",
        "password": "password_2",
        "db_name": "db_2",
        "select_result.$": "$.select_result"
      },
    "End": true
    }
  }
}

DB接続城はキーバリュー形式でそのまま書くことができますが、select_taskの結果を参照するためには$を用います。
"select_result.$": "$.select_result"という部分ですが、キー部分にも$を含めないとバリューが変数として見なされず文字列としてTaskへ渡されてしまうので要注意です。

AWS Lambda関数

ステートマシンで定義した二つの関数です。

# select_task

def lambda_handler(event, context):
    rds_host = event['initial_params']['rds_host']
    user_name = event['initial_params']['user_name']
    password = event['initial_params']['password']
    db_name = event['initial_params']['db_name']

    # DBからデータをSELECTする処理
    # 結果をselect_dataへ格納

    response = {
        'select_data': select_data
    }
# insert_task

def lambda_handler(event, context):
    rds_host = event['rds_host']
    user_name = event['user_name']
    password = event['password']
    db_name = event['db_name']

    select_data = event['select_result']['select_data']

    # DBへINSERT処理

Parametersで設定した値はeventに格納されておりそのままのキーで読み込むことができます。
またevent['select_result']['select_data']でステートマシンで定義したキーで前のTaskの結果を読み込むことができます。

まとめ

Parametersで新しく設定する値自体は単純に読み込むことが可能ですが、前のTaskの結果を参照する場合には$を使う必要があるので要注意です。

参考

xp-cloud.jp