詳解 AWS Step Functions: プログラム言語のように記法を理解しよう

さとうです。

最近、AWS Step Functions(以後Step Functionsと呼びます)ばかり触っています。

使い込む中で、体系的に説明する記事を書きたくなったので書いてみたいと思います。

Step Functions職人を目指す方はぜひ参考にしてみてください。

※長めの記事なので、時間がある時にお読みください!

この記事の想定読者

  • これからStep Functionsを学習したい方
  • なんとなく触ったことはあるが体系的に理解したい方
  • 何らかのプログラム言語の学習経験がある方

はじめに: Step Functionsを学習する時に苦戦すること

Step Functionsは一言で表すとローコードでワークフローを組めるサービスです。

個人的には、ローコードでありながらも汎用性の高さが確保されている点が素晴らしいと思います。一方で汎用性を重視したゆえの初学者のとっつきにくさもあるとも思います。

個人的にとっつきにくいと感じる点は2点あります。そのため、この2点を解消することを念頭トピックを整理しています。

とっつきにくさ(1): 入出力の仕様がわかりにくい

Step Functionsは変数の定義や、命令間で入出力の受け渡しをすることができますが、それらは全てJSONを介して行われます。

JSONが持ちうるデータ型やクエリ言語(JSONata or JSONPath)を理解しながらそれら駆使してデータの操作をすることになりますので、GUIのプルダウンなどで直感的にデータが操作できるわけではありません。汎用性の高さをもたらすメリットがありますが、この点が1つの学習ハードルになっているように思います。

このようなStep Functions特有のデータフローに着目することでデータ操作に関する理解をクリアにしていきたいと思います。

とっつきにくさ(2): 制御構文がわかりにくい

Step FunctionsにはMap、Choice、Passなど独自の制御構文が存在しますが、最初は役割をイメージしづらいです。これらの制御をプログラムの一般的な制御(条件分岐、変数、ループ、関数など)に当てはめていくと理解がしやすくなると感じます。

そこでタイトルの如く、プログラム言語の入門書のようにStep Functionsの制御構文を説明していきたいと思います。

データフローから理解するStep Functionsの基礎

Step Functionsの全体像

Step Functionsの全体像を掴むために、まずは具体的にどんな機能を持つサービスなのかを考えてみたいと思います。

公式ドキュメントの図を引用しながら、基本的な用語を以下に整理します。

引用: Step Functions のステートマシンについて – AWS Step Functions

用語 意味
ステートマシン/ワークフロー(State Machine/Workflow) 複数のステートを組み合わせたワークフロー (IAMロール、ログ設定なども紐づく)
ステート(State) 各ステートマシンで実行される命令
フロー(Flow) ステートのうち、制御構文に相当するもの
タスク/アクション(Task/Action) ステートのうち、APIを使用した各AWSサービスの操作などに相当するもの
入力(Input) 前のステートから渡されたデータ(JSON)
実行入力(Execution Input) 入力のうち、ステートの実行時にユーザが渡す任意の引数を指す(JSON)
出力(Output) 次のステートへ渡すデータ(JSON)
変数(Variables) 出力のうち、どのステートからも参照できるように保存されたもの(JSON)

用語がたくさん出ますが、要するにStep Functionsは「JSONを入出力としてAWSのAPI操作をワークフローとしてオーケストレーションできるサービス」と考えると理解がシンプルになります。

上記の図をさらに単純化すると以下のようになり、つまるところJSONのバケツリレーでワークフローを作り上げているのです。

先ほど説明したとっつきにくさはこの仕組みに由来すると考えていますが、逆に言うと、JSONのバケツリレーという点さえ押さえればさほど難しいサービスではありません。

Step Functionsのデータ型と操作

Step Funtionsの入出力はJSONであることがわかりましたが、具体的にはJSONのRFCから以下のようなデータを持てることがわかります。

Booleanを持たせることもできますし、値をNumberで持たせて計算したりStringにキャストしたりすることもできますね。

{
    "String": "1",
    "Number": 1,
    "Boolean": true,
    "Array": ["1", "2"], 
    "Object": {"thisis": "Object"},
    "null": null
}

また、これらのデータに対してクエリ言語(JSONPath or JSONata)を使用することで入出力を操作したり加工する機能が用意されています。

ステートマシン単位でいずれかを選択する必要がありますが、今から作成する場合は後発でより汎用性の高いJSONataを選びましょう。この記事では以後、JSONataの使用を前提としてJSONPathの説明はしません。

docs.aws.amazon.com

Step Functionsでできること

「JSONを入出力とする」「JSONの操作や加工ができる」という特徴から、Step Functionsとしてできること・できないことがわかります。

  • できること
    • データのキャスト、加工、正規表現による置換などJSONataの組み込み関数でできる範囲のテキストベースの加工
    • 入力や加工したデータを使用したAWSサービスのAPI操作
  • できないこと
    • ファイルへの出力など、Step Functionsから直接ストレージへの書き込みが必要になる操作
    • 256KBを超えるテキストの入出力 ※仕様上の制約

Step FunctionsでできないことはAPIを呼び出してLambdaやEC2、Glueなどのサービスに処理を移譲してしまえばよいことになります。

例えば以下は、UNIX時刻で現在時刻を取得してタイムゾーンをJSTに設定した上で成型したタイムスタンプ文字列を返すステートマシンです。

▼サンプルコードを表示する
{
  "Comment": "A description of my state machine",
  "StartAt": "現在のUNIX時刻を取得",
  "States": {
    "現在のUNIX時刻を取得": {
      "Type": "Pass",
      "Next": "UNIX時刻をJSTでフォーマット",
      "Output": {
        "TimeStamp": "{% $millis() %}"
      }
    },
    "UNIX時刻をJSTでフォーマット": {
      "Type": "Pass",
      "End": true,
      "Output": {
        "JstTimeStamp": "{% $fromMillis($states.input.TimeStamp, '[M01]/[D01]/[Y0001] [h#1]:[m01][P]', '+0900') %}"
      }
    }
  },
  "QueryLanguage": "JSONata"
}

Step Functionsの編集画面左上のボタンから「コード」を選択して下記のコードをコピペするとお手元で同一のステートマシンが作成可能です。

このようなJSONで持つことができるデータの操作であれば、Step Functions内で完結することができます。

Date/Time functions · JSONata

実行結果は以下のようになり、JSTでフォーマットされていることがわかります。

プログラム言語のように理解するStep Functionsの記法

変数

概要

各タスクの「変数」タブから、変数をJSON形式で定義することができます。

先述した通り、Step Fuctionsはタスク間でJSONの入出力を使ってデータの受け渡しをすることが基本です。

先ほどの図でステートAの出力をステートCの入力として使用したいとしましょう。

変数を使わない場合、ステートA→B、B→Cのように出力を連鎖させる必要があります。

しかし変数機能を使って出力を変数に保存しておくことで、離れたタスクから出力を使用せずとも参照することができます。

参照時には先頭に変数であることを示す $ をつけます。

Tips

文字列展開について

JSONataでは、&演算子を使うと以下のように変数を文字列内で展開できます。

サンプルコードもご参照ください。

{% '<文字列>' & $variable %}

予約変数($states)について

$states という予約変数が存在します。

先ほどステート間でJSONの入出力の受け渡しをすると説明しましたが、基本的にはこの変数を介して入出力にアクセスしています。

  • $states.input: ステートに渡された入力(先頭の場合は実行入力、それ以外は前のステートの出力になる)
  • $states.result: ステートの実行結果に関する情報
  • $states.errorOutput: エラーに関する情報(エラー発生時のみ)
  • $states.context: ステートマシンのcontextに関する情報(後述)

docs.aws.amazon.com

contextについて

予約変数の $states が返す情報の1つに context というものがあります。

docs.aws.amazon.com

ドキュメントに記載の通り context はオブジェクト構造を持ちますが、基本的にはステートマシンの実行に関するメタデータが含まれています。

実行内容(実行時間、実行IDなど)を通知やメッセージで使用したい時などに活用することができます。

最も典型的な用途は、実行入力へのアクセスです。$states.context.Execution.Input.<実行入力の要素名のパス> でどのステートからも実行入力にアクセスすることができますので、この点はぜひ覚えておきましょう。

実行入力とは、ステートマシンの実行時に渡される任意の入力(≒引数)のことです。以下のように実行入力は予約変数の $state.context.Execution.Input に格納され、ステートマシン内のどのステートからも自由にアクセスすることができます。

{ “message”: “Hello.”}という実行入力を渡して実行した場合

サンプルコード

実行入力を context から取得し、その内容を変数として宣言するコードです。

実行時に {"Oyatsu": "<任意のテキスト>"} を実行入力として使用します。

実行入力はマネジメントコンソールから「実行を開始」を押すと入力する画面が出てきますのでそちらから指定します。

▼サンプルコードを表示する
{
  "StartAt": "変数宣言",
  "States": {
    "変数宣言": {
      "Type": "Pass",
      "Assign": {
        "MeesageTemplate": "{% \"今日のおやつは「\" & $states.context.Execution.Input.Oyatsu & \"」です。\" %}"
      },
      "Next": "変数展開"
    },
    "変数展開": {
      "Type": "Pass",
      "End": true,
      "Output": {
        "Output": "{% $MeesageTemplate %}"
      }
    }
  },
  "QueryLanguage": "JSONata"
}

条件分岐

概要

処理を条件で分岐させたい時にはフローの1つであるChoiceを使用します。

プログラム的に言うと1つ目のルールがif、2つ目以降のルールがelse if、Default Rule(いずれのルールにも合致しなかった場合の分岐)がelseに相当します。

ConditionはJSONataで記載する必要があり、TrueまたはFalseを返す必要があります。

比較演算子を使用して条件式を組み立てることができます。

Comparison Operators · JSONata

Boolean Operators · JSONata

また、以下のようなBool関数を使用することもできます。 $exists() を使うと変数の有無で条件分岐が可能です。

Boolean functions · JSONata

Tips

Conditionの組み立てについて

ルール内の 条件を編集 をクリックするとGUIで条件式をビルドすることができます。

組み立てに自信がなければ使ってみましょう。

コメントについて

ルール内のコメントを記載するとGUIのワークフローに反映されるという小ネタがあります。

ワークフローの視認性が少しだけよくなります。

サンプルコード

変数 $variable の有無をチェックするコードです。

▼サンプルコードを表示する
{
  "Comment": "A description of my state machine",
  "StartAt": "variable変数を設定",
  "States": {
    "variable変数を設定": {
      "Type": "Pass",
      "Next": "Choice",
      "Assign": {
        "variable": "test"
      }
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Next": "trueの場合の操作",
          "Condition": "{% $exists($variable) %}",
          "Comment": "Trueの場合"
        }
      ],
      "Default": "falseの場合の操作"
    },
    "trueの場合の操作": {
      "Type": "Pass",
      "End": true,
      "Output": {
        "SetVariable": true
      }
    },
    "falseの場合の操作": {
      "Type": "Pass",
      "End": true,
      "Output": {
        "SetVariable": false
      }
    }
  },
  "QueryLanguage": "JSONata"
}

反復処理(ループ/イテレーション)

概要

ループ(for)とイテレーション(foreach)に分けて説明します。

ループ

残念ながらStep Functionsの機能としては用意されていません。

Choiceを応用することでループ処理を自前で実装することが可能です。

以下のようにカウンタがループの終了条件に合致しない場合はカウンタを加算してChoice処理に戻ることで、ループ処理になります。サンプルもご参照ください。


イテレーション

こちらもStep Functionsの機能としては用意されていません。

しかし、フローの1つであるMapを応用するとイテレーション処理の実装が可能です。

docs.aws.amazon.com

具体的にはMapをインラインモードで入力に配列を提供し、同時実行数を1に制限することで配列の各要素に対して1件ずつ順次Map内に定義した処理を実行するようになり、イテレーションと同等の動作をさせることができます。

※Mapは本来、並列で反復処理を実行するためのフローです。詳細は後述します。

出力または変数で順次処理をさせたい要素(配列)を提供し、Map内で処理を定義します。

渡した要素は $states.inputで取得できますが、Map内の先頭でしか取得できないため離れたステートで使用する場合は先頭で変数宣言をする必要があります。

詳細はサンプルコードを実際に動かして確認してみてください。

Tips

変数スコープについて

イテレーションにMapを使用する場合、変数スコープに癖があるので注意が必要です。

ドキュメントでは workflow-local scope と説明されていますが、Map内は各要素ごとに独立した変数スコープを持ち、Mapの外やMap内の別の要素からMap内の変数にアクセスすることができません。ただし、Map内からMapの外で定義された変数や states 変数にアクセスすることは可能です。

docs.aws.amazon.com

図にすると以下のようになり、赤色で表現した変数の参照ができない仕様です。

←Map内からの参照 | Map外からの参照→

この仕様から、Map内の処理結果をMap外で使用したい場合は変数を使うことができません。出力を使用する必要があります。

Map末尾のステートの出力はMapの各要素ごとにマージされ最終的には配列で出力される仕様になっているため、この出力を使用することができます。

各要素の出力が配列に格納され、Mapの外から1つの配列として取得できる

サンプルコード

説明の例示に使用したサンプルコードです。

▼サンプルコードを表示する

ループ

{
  "Comment": "A description of my state machine",
  "StartAt": "カウンタを設定",
  "States": {
    "カウンタを設定": {
      "Type": "Pass",
      "Next": "Choice",
      "Output": {
        "Counter": 0
      },
      "Assign": {
        "Counter": 0
      }
    },
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Condition": "{% $Counter <= 3 %}",
          "Comment": "Trueの場合",
          "Next": "ループ中の処理"
        }
      ],
      "Default": "ループ終了"
    },
    "ループ終了": {
      "Type": "Pass",
      "End": true
    },
    "ループ中の処理": {
      "Type": "Pass",
      "Next": "カウンタを加算"
    },
    "カウンタを加算": {
      "Type": "Pass",
      "Output": {
        "SetVariable": true
      },
      "Assign": {
        "Counter": "{% $Counter + 1 %}"
      },
      "Next": "Choice"
    }
  },
  "QueryLanguage": "JSONata"
}

イテレーション

{
  "StartAt": "要素を設定",
  "States": {
    "要素を設定": {
      "Type": "Pass",
      "Output": {
        "Counter": 0
      },
      "Assign": {
        "Items": [
          "りんご",
          "ばなな",
          "ぶどう"
        ]
      },
      "Next": "Map"
    },
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "メッセージを出力",
        "States": {
          "メッセージを出力": {
            "Type": "Pass",
            "End": true,
            "Output": {
              "Message": "{% \"フルーツは「\" & $states.input & \"」です。\" %}"
            }
          }
        }
      },
      "Items": "{% $Items %}",
      "MaxConcurrency": 1,
      "Next": "各要素のメッセージを取得"
    },
    "各要素のメッセージを取得": {
      "Type": "Pass",
      "End": true,
      "Output": {
        "Messages": "{% $states.input.Message %}"
      }
    }
  },
  "QueryLanguage": "JSONata"
}

関数

概要

ユーザー関数と組み込み関数に分けて説明します。

ユーザー関数

Step Functionsの機能としては用意されていません。

しかしステートマシンから別のステートマシンを呼び出すような入れ子構成にすることが可能なので、ステートマシンレベルで汎用化して他の複数のステートマシンから呼び出すようなモジュール化は可能です。

またこれまで説明してきたように、他のLambdaやAWS Batchなどの他のサービスを呼び出すことができるという特性から、そちらをモジュールとして使うような作り込みも可能になります。

モジュール化を考える時にはなんとなく万能なLambdaを使いたくなりますが、管理対象が増える分だけ構成も複雑になるのでまずは後述する組み込み関数も駆使しながら、ステートマシンのモジュール化で実装したい処理が実現できるか考えてみるとよいでしょう。

組み込み関数

JSONataの組み込み関数を使用した入出力の加工が可能です。

JSONata Documentation · JSONata

ドキュメントの Function Library にある通り、多彩な関数が用意されています。

  • String Functions
  • Numeric Functions
  • Aggregation Functions
  • Boolean Functions
  • Array Functions
  • Object Functions
  • Date/Time Functions
  • Higher Order Functions

Tips

同期処理・非同期処理について

ユーザー関数としての別のステートマシンや外部サービスを呼び出す場合に気にするのが同期・非同期の制御可否だと思います。

結論としてはどちらも可能で、サービス統合という機能があるのでこちらを使うことで実現可能です。

blog.serverworks.co.jp

上記の記事でいうところの Request Responseパターンは非同期処理にあたり、呼び出し先の処理完了を待たずに次の処理へ進みます。

一方で Run a Jobパターン(.sync) は同期処理にあたり、呼び出し先の処理完了を待って次の処理へ進みます。

これまで紹介してきたLambdaやStep Functionsはこの機能が使用可能で、設定はステート定義の 追加の設定 からラジオボタンを 子ワークフロー実行が完了するまで待ちます に設定するだけです。ちなみに非同期処理にしたい場合は 呼び出して続行 にします。

しかしこの Run a Jobパターン(.sync)全てのサービスに対応しているわけではありません。対応していないサービスはデフォルトで非同期処理になっており、処理を待つことができません。

そのようなサービスについては、Choiceを応用して処理結果を定期的にポーリングで取得するステートマシンの作り込みが必要になります。

例えば以下は、Run a Jobパターンに対応していないRedshift Data APIにSQLを同期的に実行させたい場合のステートマシンの実装サンプルです。
無限ループに陥らないようにカウンタで最大試行回数を制御したり、想定外の結果を返した場合は例外に投げて終了させるなどの考慮は必要になります。

* 処理の流れ

  1. Redshift Data: ExecuteStatementでSQLを実行し、発行されたクエリIDを取得
  2. 一定期間(サンプルでは10秒)待機してから、 Redshift Data: DescribeStatement でクエリの実行結果を取得(ポーリング処理)
  3. ポーリングの結果が FINISHED であれば処理完了とみなし処理終了、STARTED などの処理中を示すステータスの場合は再度2に戻る

※返しうるステータスはAPIリファレンスを見ながら確認しましょう。大変ですが…。

DescribeStatement – Amazon Redshift Data API


呼び出し元から他サービスを呼び出す場合のIAMロールについて

別のステートマシンや他のサービスを呼び出す場合は、呼び出し元のIAMロールで呼び出し先サービスのAPIオペレーションに対する許可が必要です。

例えば別のステートマシンを呼び出す場合に呼び出し元のステートマシンのIAMロールで必要な許可は以下になります。内部仕様の都合か、ステートマシンを呼び出す場合にはEventBridgeの許可が必要になります。

マネジメントコンソールで新規に作成する場合は自動で必要な権限が追加されますが、2回目以降の更新では反映されないため注意しましょう。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "states:StartExecution"
            ],
            "Resource": [
                "<呼び出すステートマシンのARN>"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "states:DescribeExecution",
                "states:StopExecution"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ],
            "Resource": [
                "arn:aws:events:ap-northeast-1:XXXXXXXXXXXX:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule"
            ]
        }
    ]
}

サンプルコード

Step Functionsの入れ子構成および例示したSyncパターンのセルフ実装のサンプルです。

▼サンプルコードを表示する

ユーザー関数として呼び出されるステートマシン(子)

{
  "Comment": "A description of my state machine",
  "StartAt": "実行入力からメッセージを取得",
  "States": {
    "実行入力からメッセージを取得": {
      "Type": "Pass",
      "End": true,
      "Output": {
        "Message": "{% '渡されたメッセージ: ' & $states.context.Execution.Input.Message %}"
      }
    }
  },
  "QueryLanguage": "JSONata"
}

ユーザー関数を同期処理で呼び出すステートマシン(親)

{
  "StartAt": "Step Functions StartExecution",
  "States": {
    "Step Functions StartExecution": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Arguments": {
        "StateMachineArn": "<子のステートマシンのARN>",
        "Input": {
          "Message": "Hello From Parent."
        }
      },
      "End": true
    }
  },
  "QueryLanguage": "JSONata"
}

ポーリング処理をして処理完了を待つステートマシン

{
  "StartAt": "クエリ実行",
  "States": {
    "クエリ実行": {
      "Type": "Task",
      "Arguments": {
        "Database": "redshift-databasename}",
        "ClusterIdentifier": "redshift-cluster-identifier",
        "Sql": "sql"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
      "Next": "待機(10秒)",
      "Output": {
        "Id": "{% $states.result.Id %}"
      }
    },
    "待機(10秒)": {
      "Type": "Wait",
      "Seconds": 10,
      "Next": "ポーリング処理",
      "Output": {
        "Id": "{% $states.input.Id %}"
      }
    },
    "ポーリング処理": {
      "Type": "Task",
      "Arguments": {
        "Id": "{% $states.input.Id %}"
      },
      "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement",
      "Next": "ポーリング結果判定",
      "Output": {
        "Id": "{% $states.input.Id %}",
        "Status": "{% $states.result.Status %}",
        "HasResultSet": "{% $states.result.HasResultSet %}",
        "ResultRows": "{% $states.result.ResultRows %}",
        "ResultSize": "{% $states.result.ResultSize %}"
      }
    },
    "ポーリング結果判定": {
      "Type": "Choice",
      "Choices": [
        {
          "Next": "待機(10秒)",
          "Condition": "{% $states.input.Status="SUBMITTED" or $states.input.Status="STARTED" or $states.input.Status="PICKED" %}",
          "Output": {
            "Id": "{% $states.input.Id %}",
            "Status": "{% $states.input.Status %}"
          }
        },
        {
          "Next": "成功",
          "Condition": "{% $states.input.Status="FINISHED"%}",
          "Output": {
            "Id": "{% $states.input.Id %}",
            "Status": "{% $states.input.Status %}",
            "HasResultSet": "{% $states.input.HasResultSet %}",
            "ResultRows": "{% $states.input.ResultRows %}",
            "ResultSize": "{% $states.input.ResultSize %}"
          }
        }
      ],
      "Default": "UnexpectedStatusError",
      "Output": {
        "Id": "{% $states.input.Id %}",
        "Status": "{% $states.input.Status %}",
        "Error": "",
        "HasResultSet": "{% $states.input.HasResultSet %}",
        "ResultRows": "{% $states.input.ResultRows %}",
        "ResultSize": "{% $states.input.ResultSize %}"
      }
    },
    "UnexpectedStatusError": {
      "Type": "Fail",
      "Error": "UnexpectedStatusError",
      "Cause": "{% 'the commands exited unexpected status:' & $states.input.Status %}"
    },
    "成功": {
      "Type": "Succeed",
      "Output": {
        "Id": "{% $states.input.Id %}",
        "Status": "{% $states.input.Status %}",
        "Error": "",
        "HasResultSet": "{% $states.input.HasResultSet %}",
        "ResultRows": "{% $states.input.ResultRows %}",
        "ResultSize": "{% $states.input.ResultSize %}"
      }
    }
  },
  "QueryLanguage": "JSONata"
}

並列処理

概要

並列処理にはフローのMapまたはParallelを使用することができます。

各要素に対して反復で並列処理をしたい場合

Mapを使用します。

使い方は基本的にイテレーション処理で紹介したMapの使い方と同じで、最大同時実行数を設定しなければ並列で処理されるようになります。

なお、最大同時実行数は40となり、より多く同時実行したい場合は分散モードを使用する必要があります。

要素は固定で並列処理をしたい場合

Parallelを使用します。

使い方自体は並列実行したい要素を並べるだけなのでそれほど難しくはありません。

Mapで説明した内容と同様に並列実行内の末尾の出力はマージされるのと、変数スコープは並列実行している分岐ごとに固有となるので注意してください。

Tips

Mapのインラインモードvs分散モード

両者の違いはドキュメントでも記載がありますが、制約に抵触しない限りはインラインモードの使用が推奨されています。

docs.aws.amazon.com

分散モードは入力が大規模もしくは同時実行数が膨大になる処理向けのモードですが、256KiBを超えるテキストの入力や40件以上の並列処理が必要になるケースは稀でしょう。

分散モードの場合、並列処理は別々のステートマシンで分散実行される形式になるため、処理の追跡が煩雑になってしまうデメリットがあることも留意しておきましょう。

Step Functions での分散マップ実行の表示 – AWS Step Functions

サンプルコード

MapとParallelそれぞれのサンプルです。

▼サンプルコードを表示する

Map

{
  "StartAt": "要素を設定",
  "States": {
    "要素を設定": {
      "Type": "Pass",
      "Assign": {
        "Items": [
          "りんご",
          "ぶどう",
          "ばなな"
        ]
      },
      "Next": "Map"
    },
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "並列処理",
        "States": {
          "並列処理": {
            "Type": "Pass",
            "Output": {
              "Message": "{% $states.input %}"
            },
            "End": true
          }
        }
      },
      "Next": "並列処理のメッセージを取得",
      "Items": "{% $Items %}"
    },
    "並列処理のメッセージを取得": {
      "Type": "Pass",
      "End": true,
      "Output": {
        "Messages": "{% $states.input.Message %}"
      }
    }
  },
  "QueryLanguage": "JSONata"
}

Parallel

{
  "StartAt": "要素を設定",
  "States": {
    "要素を設定": {
      "Type": "Pass",
      "Assign": {
        "Item": "りんご"
      },
      "Next": "Parallel"
    },
    "Parallel": {
      "Type": "Parallel",
      "Next": "並列処理のメッセージを取得",
      "Branches": [
        {
          "StartAt": "並列処理(1)",
          "States": {
            "並列処理(1)": {
              "Type": "Pass",
              "Output": {
                "Message": "{% $Item %}"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "並列処理(2)",
          "States": {
            "並列処理(2)": {
              "Type": "Pass",
              "Output": {
                "Message": "{% $Item %}"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "並列処理(3)",
          "States": {
            "並列処理(3)": {
              "Type": "Pass",
              "Output": {
                "Message": "{% $Item %}"
              },
              "Assign": {
                "hoge": "hoge"
              },
              "End": true
            }
          }
        }
      ]
    },
    "並列処理のメッセージを取得": {
      "Type": "Pass",
      "End": true,
      "Output": {
        "Messages": "{% $states.input.Message %}"
      }
    }
  },
  "QueryLanguage": "JSONata"
}

例外処理

概要

ステートの エラー処理 タブから再試行およびエラーキャッチを定義することができます。

Erros にキャッチしたいエラー内容を定義するまでは同一です。

再試行

以下の再試行条件を設定することができます。
エクスポネンシャルバックオフな実装ができますが、手動で再試行をトリガーするといった操作はできないので注意しましょう。

  • Interval: 初回の再試行開始までの待機時間(秒)
  • Max attempts: 最大試行回数
  • Backoff rate: 2回目以降の再試行までの待機時間(秒)
  • 最大遅延秒数: 再試行の間に許容される最大待機時間(秒)

エラーキャッチ

Fallback state にエラーが発生した時の処理を定義します。

エラー時の処理内容の組み立て方法は正常時と同様です。

Tips

例外処理が定義できないステート

PassステートおよびWaitステートは例外処理を定義することができません。

PassやWaitでJSONataを使用した文字列加工などの失敗しうる処理を実装している場合は注意しましょう。

キャッチできるエラーについて

Erros にキャッチしたいエラーを定義することができますが、 S3.NoSuchBucketException などAPIが返す内容も含めて特定の例外クラスを指定してエラーをキャッチすることが可能です。

States.ALL とした場合には全てのエラーがキャッチされます。

サンプルコード

再試行とエラーキャッチの実装サンプルです。エラーが発生した場合のみPassステートが実行されます。

▼サンプルコードを表示する

再試行

{
  "StartAt": "Step Functions StartExecution",
  "States": {
    "Step Functions StartExecution": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Arguments": {
        "StateMachineArn": "arn:aws:states:REGION:ACCOUNT_ID:stateMachine:STATE_MACHINE_NAME",
        "Input": {
          "StatePayload": "Hello from Step Functions!",
          "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID": "error_test"
        }
      },
      "End": true,
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "BackoffRate": 2,
          "IntervalSeconds": 1,
          "MaxAttempts": 3
        }
      ]
    }
  },
  "QueryLanguage": "JSONata"
}

エラーキャッチ

{
  "StartAt": "Step Functions StartExecution",
  "States": {
    "Step Functions StartExecution": {
      "Type": "Task",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Arguments": {
        "StateMachineArn": "arn:aws:states:REGION:ACCOUNT_ID:stateMachine:STATE_MACHINE_NAME",
        "Input": {
          "StatePayload": "Hello from Step Functions!",
          "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID": "error_test"
        }
      },
      "End": true,
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "エラー発生時の処理を定義"
        }
      ]
    },
    "エラー発生時の処理を定義": {
      "Type": "Pass",
      "End": true
    }
  },
  "QueryLanguage": "JSONata"
}

ユースケース

汎用性の高さから可能性も無限大のStep Functionsですが、作例の1つとしてデータエンジニアリングの領域でETLジョブをStep Functionsで作成する場合を紹介します。

ETLジョブの組み立てについてはGlue ETL Jobという専用サービスが存在するものの、入れ子構成などの実運用を踏まえた汎用性はStep Functionsに軍配が上がります(詳細は以下のブログを参照してください)。

blog.serverworks.co.jp

以下は BronzeSilverGold と、データの処理段階を3段階に分けて入れ子構成のETLジョブをStep Functionsで実装する場合のイメージ図です。

今回ご紹介したStep Functionsの記法を使うことで、Glue ETL Jobよりも詳細な作り込みが可能になります。

  • 親/子ジョブ層: 同期/非同期やエラーキャッチなど、孫ジョブの実行順序や例外処理を取り決めて制御する
  • 孫ジョブ層: 変数、条件分岐、反復処理を駆使してETL処理を作り込み、処理を関数化(モジュール化)する

上記のようにAWSのサービスを組み合わせた処理であれば、ほかにもAIチャットボットの制御など様々なユースケースに対応することが可能です。

利用料金の考え方

最後に利用料金についても触れておきたいと思います。

まず、無限ループなどの実装ミスにさえ気を付ければ、Step Functions自体は非常に安価なサービスです。

StandardとExpressの場合で料金が異なりますが、ここでは一般的に使用されるStandardを前提にします(Express自体の説明は、また別の記事で)。

aws.amazon.com

Standardの場合、「状態遷移」の回数に対して料金が発生する仕組みです。状態遷移には4,000回/月の無料利用枠があります。

ここでいう「状態」は、ステート(State)の意味です。

状態遷移の回数は実行結果から確認することができ、例えばPassを1回実行するだけのステートマシンでは3回でした。

例えば以下の結果からは状態遷移が3であることがわかります。

「1つのステートしか実行していないのだから1では?」と思うかもしれませんが、これはステートにStartとEndが含まれているためです。

1つのPassを実行するために StartPassEnd と3つのステートに遷移しているので3とカウントされているわけですね。

基本的には再試行なども含めて Start から End までに通過したステートの数でカウントされますので、4,000回という無料利用枠はそれなりに闞沢であることがイメージできると思います。

無料枠を超えた従量課金も0.025USD/1,000回なので、つまり4,000回の状態遷移につき1USDです。

おわりに

マッチョな文量になりました🫠

Step Functionsでワークフローを開発するイメージは持てましたでしょうか?

年々アップデートを重ねて使い勝手が良くなっていることを実感できるサービスだと思っています。ブランクがある方も機会があればぜひ使ってみましょう。

佐藤 航太郎(執筆記事の一覧)

エンタープライズクラウド部 クラウドモダナイズ課
2025年1月入社で何でも試したがりの雑食系です。




Source link

関連記事

コメント

この記事へのコメントはありません。