Microsoft Agent Framework開発入門 #Azure – Qiita

概要

Microsoft Agent Framework は、Microsoft が OSS として提供する AI エージェント & マルチエージェントワークフローを開発する フレームワークです。

Microsoft の既存エージェントフレームワークである、Semantic KernelAutoGen のメリットを統合し、同じチームによって構築された次世代のエージェント開発基盤として位置づけられています。

image.png

本フレームワークは、大きく以下のカテゴリによって構成されています。

カテゴリ 内容
Agents LLM による意思決定・ツール実行・応答生成を担う個別エージェント。Azure や OpenAI など複数のモデルプロバイダーをサポート。
Workflows 複数のエージェントや関数をグラフ構造でつなぎ、ワークフローを定義。型安全なルーティング / 入れ子 / Human-in-the-Loop / チェックポイント再開 などをサポート。

エージェント抽象化によるメリット

Agent Framework のすべてのエージェントは BaseAgent を継承する(=AgentProtocol を満たす)ことが共通要件になっています。

この設計により、エージェントとしての振る舞いが統一され、Azure AI Foundry Agent や Copilot Studio Agent のような異なる実装基盤のエージェントでも、同じインターフェイスで扱えることが保証されています。

つまり、バックエンド差異を抽象化し、安全にマルチエージェント・オーケストレーションへ載せられることが最も大きな価値です。

ms_agent_fs_class_01.png

AgentProtocol

AgentProtocol には「エージェントとして扱うために必須となる挙動」が定義されています。

種別
プロパティ id, name, display_name, description
メソッド run(...), run_stream(...), get_new_thread(...)

BaseAgent

BaseAgent は、AgentProtocol を満たすエージェントの「共通基盤」です。

  • スレッド/履歴管理(AgentThread の互換実装)
  • ツール化(.as_tool()
  • ミドルウェア
  • 観測・トレース など

開発者が直接触れることは少ないですが、どのエージェントでも同じように動く ことを支えているのがこのレイヤーです。

ChatAgent

ChatAgentBaseAgent の汎用実装であり、最も利用頻度の高いクラスです。
chat_clientChatClientProtocol 準拠のクライアント(Azure / OpenAI など)を差し込むだけでエージェント化できます。

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient

agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(...),
    name="assistant",
    instructions="Be helpful and concise.",
    tools=[...],  # 任意:関数/MCPTool など
)

ワークフロー統合

AgentProtocol に準拠していれば そのままワークフロー上のノード(Executor)として接続できます。

そのため、

  • Azure AI Foundry で作成したエージェント
  • Chat Completion API ベースのエージェント
  • Responses API ベースのエージェント など

のように実装元やランタイムが異なっていても、フレームワーク全体で統一的なマルチエージェント・オーケストレーションを実現できます。

概要

Agent Framework のエージェントは、LLM の推論でユーザー意図を解釈し、必要に応じてツールを呼び出して自律的にタスクを解決します。

以下はシンプルな実装例です。

import asyncio
import os

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient


# ---------- 関数の定義 ----------
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: 72°F and sunny"

# ---------- エージェントの作成 ----------
agent = ChatAgent(
    name="WeatherAgent",
    chat_client=AzureOpenAIChatClient(...),
    instructions = "あなたは優れたアシスタントです。",
    tools=[get_weather]
)


# ---------- エージェントの実行 ----------
async def main():

    response = await agent.run("パリの気温は?")
    print(f"Assistant: {response}")

asyncio.run(main())


# ---------- 出力例 ----------
# Assistant: パリの現在の気温は約22°C(72°F)で、晴れています。

ツール

エージェントが利用できるツールには大きく、関数型ツールホスト型ツールが利用できます。

区分 説明 典型用途
関数型ツール Python 関数などをツール化し、LLM が直接呼び出す形式。 カスタム処理・ローカル実行
ホスト型ツール Web検索・コード実行・ファイル検索・MCP など、サービス側に組み込まれた Build-in ツールをそのまま利用。 開発・運用負荷を下げたい場合

ホスト型ツール

エージェントクライアントの サービス側で提供されている組み込み済み(Build-in) ツールをそのまま利用します。例として、Web 検索やコード実行、ファイル検索、MCP 連携などがあります。

認証・実行・スケーリングなどをサービス側が担うため、クライアント実装が最小限で済みます。

from agent_framework import ChatAgent, HostedCodeInterpreterTool
from agent_framework.azure import AzureOpenAIResponsesClient

# ---------- ホスト型コード実行ツールを利用 ----------
agent = ChatAgent(
    name="CodeInterpreterAgent",
    chat_client=AzureOpenAIResponsesClient(...),
    instructions="...",
    tools=HostedCodeInterpreterTool(),
)

Agent as Tool

既存エージェントを 「ツール化」 して、別のエージェントから呼び出す機能です。

これにより、専門家エージェントを関数ツールのように使うことができ、シンプルなマルチエージェント構成を迅速に構築できます。

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient

# ---------- 子エージェントの構築 & ツール化 ----------
specialist = ChatAgent(
    name="WeatherExpert",
    chat_client=AzureOpenAIChatClient(...),
    instructions="天気だけに答える専門家。"
)

weather_tool = specialist.as_tool(...)

# ---------- 親エージェントの構築 ----------
manager = ChatAgent(
    name="Manager",
    chat_client=AzureOpenAIChatClient(...),
    instructions="質問を分解し、必要ならツールで解決する。",
    tools=[weather_tool], # ← ツールとして指定
)

MCP 対応

Azure Framework では、MCP ツールにも対応しています。

from agent_framework import ChatAgent, MCPStreamableHTTPTool
from agent_framework.azure import AzureOpenAIChatClient

mcp = MCPStreamableHTTPTool(
    name="Microsoft Learn MCP",
    url="https://learn.microsoft.com/api/mcp",
)

agent = ChatAgent(
    name="DocsAgent",
    chat_client=AzureOpenAIChatClient(...),
    instructions="必要に応じて MCP で社内ドキュメントを検索して答える。",
    tools=[mcp],
)

ミドルウェア

ミドルウェアは、エージェント実行の各段階に介入して関数を実行することができます。これにより、コアロジックを変更せずに次のような横断的機能を実装できます。

  • ログ記録(処理開始・終了など)
  • セキュリティ/入力検証(不正リクエストの遮断)
  • 結果変換(レスポンスの整形、フィルタリング、上書き)

Agent Framework には、処理の粒度に応じて 3 種類のミドルウェアが用意されています。

種別 介入ポイント 主用途
Agent Run の開始〜終了 実行計測、キャッシュ回答、全体ポリシー
Chat LLM 呼出し前後 PII マスク、プロンプト強化、差し替え
Function ツール呼出し前後 引数検証、ホワイト/ブラックリスト、承認
サンプルコード
import asyncio
import os

from agent_framework import ChatAgent
from agent_framework.azure import AzureOpenAIChatClient

from typing import Callable, Awaitable
from agent_framework import AgentRunContext, FunctionInvocationContext, ChatContext


# ---------- エージェントミドルウェア定義 ----------
async def logging_agent_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
    """Agent実行の前後でログ出力"""
    print("> Before Agent Execution")
    await next(context)
    print("> After Agent Execution")

# ---------- チャットミドルウェア定義 ----------
async def logging_chat_middleware(
    context: ChatContext,
    next: Callable[[ChatContext], Awaitable[None]],
) -> None:
    """LLM呼び出し前後でログ出力"""
    print(f"  > Before LLM Call. Messages: {len(context.messages)}")
    for m in context.messages:
        print(f"    - {m.role}: {m.text}")  
    await next(context)
    print("  > After LLM Call")

# ---------- 関数ミドルウェア定義 ----------
async def logging_function_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]],
) -> None:
    """ツール呼び出し前後でログ出力"""
    print(f"    > Before Function Call: {context.function.name}")
    await next(context)
    print("    > After Function Call")

# ---------- 関数定義 ----------
def get_weather(location: str) -> str:
    """Get weather for a location."""
    print("        - Executing get_weather function.")
    return f"Weather in {location}: 72°F and sunny"

# ---------- エージェントの作成 ----------
agent = ChatAgent(
    name="WeatherAgent",
    chat_client=AzureOpenAIChatClient(
        api_key=API_KEY,
        endpoint=ENDPOINT,
        deployment_name="gpt-4.1",
    ),
    instructions = "あなたは優れたアシスタントです。",
    tools=[get_weather],
    middleware=[
        logging_agent_middleware,
        logging_function_middleware,
        logging_chat_middleware,
    ],
)

# ---------- エージェントの実行 ----------
async def main():

    response = await agent.run("パリの気温は?")
    print(f"Assistant: {response}")

asyncio.run(main())


# ---------- 出力例 ----------
# > Before Agent Execution
#   > Before LLM Call. Messages: 2
#     - system: あなたは優れたアシスタントです。
#     - user: パリの気温は?
#   > After LLM Call
#     > Before Function Call: get_weather
#         - Executing get_weather function.
#     > After Function Call
#   > Before LLM Call. Messages: 4
#     - system: あなたは優れたアシスタントです。
#     - user: パリの気温は?
#     - assistant:
#     - tool:
#   > After LLM Call
# > After Agent Execution
# Assistant: パリの気温は摂氏約22°C(華氏72°F)で、晴れています。

メモリ

Agent Framework では、エージェントの記憶において、 短期記憶(スレッド/会話履歴)長期記憶(嗜好・知識の持ち越し) の二層を標準提供します。これにより、会話継続パーソナライズを両立します。

image.png

短期記憶

短期記憶は、AgentThread として管理され、同じスレッドを渡し続けることで「前の発話やツール実行結果」を踏まえた応答が可能です。

# ---------- 新規スレッドを作成 ----------
thread = agent.get_new_thread()

# ---------- 1 ターン目 ----------
response = await agent.run("日本の首都は?", thread=thread)
print(response)  # 例: 東京です。

# ---------- 2 ターン目 ----------
response = await agent.run("その人口は?", thread=thread)
print(response)  # 例: 東京都の推計人口は約1,400万人です。

長期記憶

長期記憶はスレッドを跨いで残す知識を扱います。
ContextProvider で構成され、保管・管理は、AgentThread 内の context_provider(=AggregateContextProvider)に紐づきます。
エージェント実行の 前後 にメソッドを差し込んで読み書きします。

image.png

タイミング メソッド 役割
エージェント実行 invoking() 記憶から instructions / messages / tools を組み立て、LLM へ渡す追加のコンテキストを注入
エージェント実行 invoked() 会話から学習すべき情報を抽出し保存
サンプルコード

以下は「ユーザーが好きな都市」を学習し、次回の回答で活用する実装例です。

from agent_framework import ContextProvider, Context

# ---------- コンテキストプロバイダー定義 ----------
class UserPrefMemory(ContextProvider):
    def __init__(self):
        self.pref_city = None  # ユーザー嗜好のストレージ(例:好きな都市)

    # 実行"前" : 記憶をLLMに注入
    async def invoking(self, messages, **kwargs) -> Context:
        if self.pref_city:
            return Context(
                instructions=f"ユーザーは {self.pref_city} が好き。回答で優先的に触れること。"
                # instructions だけでなく messages / tools も追加可能 
            )
        return Context()

    # 実行"後" : 新しい嗜好の検出 → 記憶
    async def invoked(self, request_messages, response_messages=None, **kwargs) -> None:
        text = request_messages[-1].text if request_messages else ""
        if "I like " in text:
            self.pref_city = text.split("I like ", 1)[1].split()[0]

# ---------- コンテキストプロバイダー初期化 ----------
memory = UserPrefMemory()

# ---------- エージェント・スレッドの構築 ----------
agent = ChatAgent(
    name="MemoryAgent",
    chat_client=AzureOpenAIChatClient(...),
    instructions="文脈を活用して丁寧に答える。",
    context_providers=[memory],
)

thread = agent.get_new_thread()

# ---------- 実行 ----------
async def main():

    # 1回目:学習フェーズ
    await agent.run("I like Paris", thread=thread)

    # 2回目:活用フェーズ(Paris が回答に反映される)
    await agent.run("おすすめスポットは?", thread=thread)

    # 長期記憶を確認
    user_pref_memory = thread.context_provider.providers[0]
    if user_pref_memory:
        print("\n=== 記憶された情報 ===")
        print(f"ユーザーの好みの都市: {user_pref_memory.pref_city}")

asyncio.run(main())

# ---------- 出力例 ----------
# === 記憶された情報 ===
# ユーザーの好みの都市: Paris

概要

Workflows は、複数の Executor とそれらを結ぶ Edge を組み合わせ、処理のフローをグラフとして表現する仕組みです。
Agent Framework では、並列実行やファンアウト・ファンイン、状態管理、Human-in-the-Loop などを駆使して複雑なビジネスプロセスをワークフローとして定義し実行することができます。

image.png

ワークフローの構成要素

Workflows は、次の 4 つの要素で構成されています。

要素名 役割 説明
Executor 処理ユニット 入力を受け取り、処理し、次のステップまたは最終出力へ送る
Edge 接続 Executor 同士をつなぎ、ルーティング・条件分岐・並列化を表現
Workflow 実行器 Executor と Edge からなる有向グラフそのもの
Event 可観測性 実行中の状態や応答を追跡(ストリーミング / 非ストリーミング両対応)

Executor

Executor はワークフローの最小単位です。
入力を受け → 処理し → 出力(メッセージ or 最終結果)を返す という役割を持ちます。

  • LLM エージェントを内包させる
  • Python などのプログラム処理だけを担当させる

といった両方の活用ができます。

定義方法

Executor の定義方法には、以下の 2 パターンがサポートされています。

記法 説明 適した用途
クラス継承 + @handler 入力型ごとに複数ハンドラを持てる。制御が明確 状態管理・複数入力対応
関数ベース(@executor 最も簡潔。1つの処理で完結 小さな関数的処理

クラスベース例

class UpperCase(Executor):
    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message(text.upper())

関数ベース例

@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(text.upper())

「型」設計が重要

Agent Framework では Executor に 型注釈 を付けることで、

  • 「どんなデータを受け取るか」(Input Type)
  • 「下流に何を渡すか」(T_Out)
  • 「ワークフローに最終的に返す値は何か」(T_W_Out)

を明示的に宣言します。

ワークフロー構築時(build 時)に型整合性が検査されるため、
大規模・複雑なフローでも安全に構築できるのが大きな特徴です。

image.png

入力型(Input Type)
Executor が 何を受信するか を示す型です。

スタイル 入力型(Input Type)の指定箇所
クラスベース @handler メソッドの第2引数 (※ 第一引数は self
関数ベース 関数の第1引数

メッセージ送信型(T_Out)
次の Executor に送るメッセージの型と送付方法です。

  • WorkflowContext[T_Out, T_W_Out]1 番目の T_Uot で型を指定します。
  • ctx.send_message()次の Executor へ送るデータ を指定します。
class UpperCase(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message(text.upper())  # ここで T_Out = str

ワークフロー出力型(T_W_Out)
ワークフロー全体の最終出力にデータを送りたい時の型と送付方法です。

  • WorkflowContext[T_Out, T_W_Out]2 番目の T_W_Out で型を指定します。
  • ctx.yield_output()ワークフロー全体の最終出力 を指定します。
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(text[::-1])  # ここで T_W_Out = str

型指定の一例

メッセージ送信 最終出力 意味
WorkflowContext なし なし [Never, Never] と同じ挙動
WorkflowContext[str] str なし メッセージのみ
WorkflowContext[Any, Any] Any Any どちらも型自由(試作向き)
WorkflowContext[Never, int] なし int 最終出力だけ返す終端

ワークフローの構築

構築プロセス

ワークフローの構築には、WorkflowBuilder クラスを使用します。

  1. WorkflowBuilder を初期化
  2. 開始 Executor を指定
  3. EdgeExecutor 同士を接続(有向グラフを設計)
  4. Build して実行モデルを確定
# 1. WorkflowBuilder を初期化
builder = WorkflowBuilder()

# 2. 開始 Executor を指定
builder.set_start_executor(upper_case)

# 3. Edge で Executor 同士を接続(有向グラフを設計)
builder.add_edge(executor1, executor2)
builder.add_fan_out_edges(executor2, [executor3, executor4])
builder.add_fan_in_edge([executor3, executor4], executor5)
...

# 4. Build して実行モデルを確定
workflow = builder.build()

ビルド時の検証ステップ

ビルド時には、次の静的検証が行われ、実行時エラーを未然に防ぎます。

検証項目 内容
型互換性 接続された Executor 間でメッセージ型が互換であることを確認
グラフ接続性 すべての Executor が開始点から 到達可能 であることを検証
Executor バインディング 各 Executor が正しく バインド/インスタンス化 されているか確認
Edge 検証 重複エッジ無効な接続 を検知

豊富なエッジパターン

Agent Framework の Edge には、実務のフロー設計にそのまま使えるルーティング手法が揃っています。条件分岐やファンアウト/ファンインなど、柔軟に表現できます。

エッジ種類 説明
Direct Edge 一対一で素直につなぐ
Conditional Edge 条件が成立したときだけ通す
Switch / Case Edge 複数条件のいずれかへ振り分ける
Fan-Out Edge 1 → 多へ分岐(並列実行)
Fan-In Edge 多 → 1 へ合流(並列結果を集約)

Direct Edge

2 つの Executor を直列に接続します。add_edge(source, target) を使います。

image.png

サンプルコード
import asyncio

from agent_framework import (
    WorkflowBuilder,
    WorkflowContext,
    executor
)
from typing_extensions import Never

# ---------- Executor 定義 ----------
@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """入力テキストを大文字に変換"""
    await ctx.send_message(text.upper())

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """入力テキストを反転"""
    await ctx.yield_output(text[::-1])

# ---------- ワークフロー構築 ----------
# ワークフローの構築
# 1. WorkflowBuilder を初期化
# 2. 開始 Executor を指定
# 3. Edge で Executor 同士を接続(有向グラフを設計)
# 4. Build して実行モデルを確定
builder = WorkflowBuilder()
builder.set_start_executor(upper_case)
builder.add_edge(upper_case, reverse_text)
workflow = builder.build()

# ---------- ワークフロー実行 ----------
async def main() -> None:

    events = await workflow.run("hello world")
    print(events.get_outputs())   
    
if __name__ == "__main__":
    asyncio.run(main())

# ---------- 出力例 ----------
# ['DLROW OLLEH']

Conditional Edge

特定の条件を満たした場合にのみ通過できる「条件付きエッジ」です。
add_edge(..., condition=<条件関数>) のように condition 属性へ条件関数を指定すると、その関数が True を返したときだけエッジが有効化されます。

image.png

サンプルコード
import asyncio
from typing import Never

from agent_framework import (
    WorkflowBuilder, 
    WorkflowContext, 
    executor, 
)

# ---------- Executor 定義 ----------
@executor(id="start")
async def start(text: str, ctx: WorkflowContext[str]) -> None:
    """開始ノード: 入力文字列をそのまま下流に流す"""
    await ctx.send_message(text)

@executor(id="handle_normal")
async def handle_normal(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """通常処理: 最終出力(yield_output)を返す"""
    await ctx.yield_output(f"OK: {text}")

@executor(id="handle_spam")
async def handle_spam(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """スパム処理: 最終出力(yield_output)を返す"""
    await ctx.yield_output("SPAM: blocked")

# ---------- ルーティング条件 ----------
def is_spam(msg: str) -> bool:
    """メッセージに 'spam' を含むかどうか(単純な条件判定)"""
    return isinstance(msg, str) and ("spam" in msg.lower())

def is_not_spam(msg: str) -> bool:
    """スパムではないか"""
    return isinstance(msg, str) and ("spam" not in msg.lower())

# ---------- ワークフロー構築 ----------
builder = WorkflowBuilder()
builder.set_start_executor(start)
builder.add_edge(start, handle_spam,   condition=is_spam)      # - "spam" を含む → handle_spam
builder.add_edge(start, handle_normal, condition=is_not_spam)  # - "spam" を含まない → handle_normal
workflow = builder.build()

# ---------- 実行例 ----------
async def main():
    # 例1: 通常メッセージ
    events = await workflow.run("Hello team")
    print("Output (normal):", events.get_outputs()[0])

    # 例2: スパムメッセージ
    events = await workflow.run("Buy now!! SPAM offer!!")
    print("Output (spam):  ", events.get_outputs()[0])

    # 可視化
    from agent_framework import WorkflowViz
    viz = WorkflowViz(workflow)
    print("Mermaid:")
    print(viz.to_mermaid())

if __name__ == "__main__":
    asyncio.run(main())

# ---------- 出力例 ----------
# Output (normal): OK: Hello team
# Output (spam):   SPAM: blocked

Switch / Case Edge

Switch / Case Edge は、複数の条件を上から順番に評価し、最初に True になったターゲット Executor へメッセージをルーティングする仕組みです。

image.png

サンプルコード
import asyncio
from typing import Any, Never

from agent_framework import (
    WorkflowBuilder,      
    WorkflowContext,      
    executor,             
    Case,
    Default
)



# ---------- Executor 定義 ----------
@executor(id="start")
async def start(msg: Any, ctx: WorkflowContext[Any]) -> None:
    """開始ノード: 入力をそのまま下流へ流す(Switch の起点)"""
    await ctx.send_message(msg)

@executor(id="handle_text")
async def handle_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """文字列: 大文字にして最終出力"""
    await ctx.yield_output(f"TEXT: {text.upper()}")

@executor(id="handle_number")
async def handle_number(n: int, ctx: WorkflowContext[Never, str]) -> None:
    """数値: 2倍して最終出力"""
    await ctx.yield_output(f"NUMBER: {n * 2}")

@executor(id="handle_other")
async def handle_other(obj: Any, ctx: WorkflowContext[Never, str]) -> None:
    """その他: 型名を返して最終出力"""
    await ctx.yield_output(f"OTHER: {type(obj).__name__}")


# ---------- ワークフロー構築 ----------
builder = WorkflowBuilder()
builder.set_start_executor(start)

# Switch-Case:
# - ソースからの入力が、"文字列" なら handle_text
# - ソースからの入力が、"数値" なら handle_number
# - ソースからの入力が、そのどれにも該当しなければ Default -> handle_other
builder.add_switch_case_edge_group(
    source=start,
    cases=[
        Case(condition=lambda m: isinstance(m, str), target=handle_text),
        Case(condition=lambda m: isinstance(m, int), target=handle_number),
        Default(target=handle_other),
    ],
)

workflow = builder.build()


# ---------- 実行例 ----------
async def main():
    samples = ["hello world", 21, {"x": 1}]
    for s in samples:
        events = await workflow.run(s)
        print("Input :", s)
        print("Output:", events.get_outputs()[0])
        print("-" * 7)

if __name__ == "__main__":
    asyncio.run(main())

# ---------- 出力例 ----------
# Input : hello world
# Output: TEXT: HELLO WORLD
# -------
# Input : 21
# Output: NUMBER: 42
# -------
# Input : {'x': 1}
# Output: OTHER: dict
# -------

Fan-Out / Fan-In Edge

Fan-Out は 1→多 に分岐して並列実行し、Fan-In は 多→1 に集約して結果をまとめます。

image.png

サンプルコード
import asyncio
from typing import Never

from agent_framework import (
    WorkflowBuilder,      
    WorkflowContext,      
    executor,             
)

# ---------- Executors ----------

@executor(id="start")
async def start(text: str, ctx: WorkflowContext[str]) -> None:
    """開始ノード: 入力文字列をそのまま下流へ流す(Fan-Out の起点)"""
    await ctx.send_message(text)

@executor(id="count_words")
async def count_words(text: str, ctx: WorkflowContext[int]) -> None:
    """単語数を数える(スペース区切り・超シンプル)"""
    await ctx.send_message(len(text.split()))

@executor(id="count_chars")
async def count_chars(text: str, ctx: WorkflowContext[int]) -> None:
    """文字数を数える(改行含む・超シンプル)"""
    await ctx.send_message(len(text))

@executor(id="aggregate")
async def aggregate(results: list[int], ctx: WorkflowContext[Never, str]) -> None:
    """集約ノード(Fan-In の終点)"""
    words, chars = results  
    await ctx.yield_output(f"Words: {words}, Chars: {chars}")

# ---------- ワークフロー構築 ----------
builder = WorkflowBuilder()
builder.set_start_executor(start)

# Fan-Out: start から 2 系統に並列分岐
builder.add_fan_out_edges(start, [count_words, count_chars])

# Fan-In: 2 系統の結果が揃ったら aggregate に list で渡される
builder.add_fan_in_edges([count_words, count_chars], aggregate)

workflow = builder.build()

# ---------- 実行例 ----------
async def main():
    text = "Fan out and fan in are powerful workflow patterns."
    events = await workflow.run(text)
    outputs = events.get_outputs() 
    if outputs:
        print("Output:", outputs[0])

if __name__ == "__main__":
    asyncio.run(main())


# ---------- 出力例 ----------
# Output: Words: 9, Chars: 50

共有状態(Shared States)

ワークフロー内では、共有状態を使用することができ、Executor 間の情報共有が容易に実装できます。

image.png

サンプルコード
import asyncio
from typing import Never
from agent_framework import WorkflowBuilder, WorkflowContext, executor

PREFIX_KEY = "greeting_prefix"

# ---------- Executors ----------
@executor(id="save_prefix")
async def save_prefix(name: str, ctx: WorkflowContext[str]) -> None:
    """共有状態に prefix を保存し、名前を下流へ送る"""
    await ctx.set_shared_state(key=PREFIX_KEY, value="Hello")  # 共有状態にデータを保存
    await ctx.send_message(name)

@executor(id="greet")
async def greet(name: str, ctx: WorkflowContext[Never, str]) -> None:
    """共有状態から prefix を取得し、名前を結合して出力する"""
    prefix = await ctx.get_shared_state(key=PREFIX_KEY)       # 共有状態からデータを取得
    await ctx.yield_output(f"{prefix}, {name}.")

# ---------- ワークフロー構築 ----------
builder = WorkflowBuilder()
builder.set_start_executor(save_prefix)
builder.add_edge(save_prefix, greet) 
workflow = builder.build()

# ---------- 実行例 ----------
async def main():
    events = await workflow.run("Taro")
    print(events.get_outputs()[0]) 

if __name__ == "__main__":
    asyncio.run(main())

# ---------- 出力例 ----------
# Hello, Taro.
  • ctx.set_shared_state(key=, value=):
    キーを指定して、値を共有状態ストレージに保存
  • ctx.get_shared_state(key=):
    キーを指定して共有状態から値を取得

チェックポイント

長時間または複数ステップにわたるワークフローでは、処理の途中経過(状態)を「チェックポイント」として保存できます。
途中で失敗しても“最初からやり直し”にならず、保存地点から安全に再開できる仕組みです。

コードイメージ
from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

...

# ---------- チェックポイントを保持するストレージ作成 ----------
checkpoint_storage = InMemoryCheckpointStorage()

# ---------- ワークフロー構築 ----------
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)  
builder.add_edge(start_executor, executor_b)  
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
builder.with_checkpointing(checkpoint_storage) # チェックポイントを指定
workflow = builder.build()

# ---------- 初回実行 ----------
async for event in workflow.run_streaming(input):
...

# ---------- 保存されたチェックポイント一覧を取得 ----------
checkpoints = await checkpoint_storage.list_checkpoints()

# ---------- 最後に保存されたチェックポイントから途中再開 ----------
last_checkpoint = checkpoints[-1]
async for event in workflow.run_stream_from_checkpoint(last_checkpoint.checkpoint_id):
...

保存されるもの
チェックポイントでは、ワークフローを再構築するために必要な以下の状態が保存されます。

保存対象 説明
各 Executor の現在の状態 いまどの処理の途中なのか
次のスーパーステップに渡る保留メッセージ まだ消費されていないメッセージの内容
保留中の要求・応答 未完了のやりとり(API呼び出し・LLMレスポンス)など
共有状態(Shared State) ワークフロー全体で共有している変数/状態

リクエスト&レスポンス

業務ワークフローでは、途中で 人間の承認外部システムの応答非同期ジョブの完了待ち といった外部要因が必要になる区間が頻出します。

Agent Framework のワークフローは、このような「一時停止 → 外部の判断 → 再開」という挙動をネイティブにサポートします。

処理の流れ

リクエスト&レスポンスは、RequestInfoExecutor という特殊な Executor を介して実行します。

この Executor に到達すると、ワークフローは外部応答を待つ状態に遷移します。ホストアプリ(UI/バックエンド)がレスポンスを送ると、停止地点から自動再開されます。

ステップ 何が起きるか 補足
① リクエスト送信 上流 Executor → RequestInfoExecutor に依頼メッセージを送出 依頼ペイロードは RequestInfoMessage 継承型
② 停止&イベント発行 ワークフローを一時停止し、RequestInfoEventアプリ側へ通知 状態は IN_PROGRESS_PENDING_REQUESTSIDLE_WITH_PENDING_REQUESTS
③ 外部から応答 ユーザー/外部が判断し、send_responses_streaming(...)(または send_responses(...))で返却
④ 再開&継続 RequestInfoExecutorRequestResponse を受理し、元の実行経路の次 Executor へ進む チェックポイント対応で安全に再開
サンプルコード

サンプルコードの構成は以下になります。

image.png

import asyncio
from dataclasses import dataclass
from typing_extensions import Never

from agent_framework import (
    WorkflowBuilder,
    WorkflowContext,
    Executor,
    RequestInfoExecutor,
    RequestInfoEvent,
    RequestInfoMessage,
    RequestResponse,
    WorkflowOutputEvent,
    WorkflowStatusEvent,
    WorkflowRunState,
    handler,
)

# ---------- 型定義 ----------
@dataclass
class ApprovalRequest(RequestInfoMessage):
    """承認リクエスト(RequestInfoMessage を継承)"""
    prompt: str = ""
    attempt_count: int = 1

# ---------- 承認管理 Executor ----------
class ApprovalManager(Executor):
    """承認プロセスを管理するExecutor"""

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "approval_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[ApprovalRequest]) -> None:
        """最初の承認リクエストを送信"""
        print("[ApprovalManager] 承認プロセスを開始します")
        await ctx.send_message(ApprovalRequest(
            prompt="レポート公開の承認をお願いします (y/n)",
            attempt_count=1
        ))

    @handler
    async def handle_response(
        self,
        feedback: RequestResponse[ApprovalRequest, str],
        ctx: WorkflowContext[ApprovalRequest, str],
    ) -> None:
        """承認レスポンスを処理"""
        reply = (feedback.data or "").strip().lower()
        request_data = feedback.original_request
        
        print(f"[ApprovalManager] 試行 {request_data.attempt_count}: 応答='{reply}'")
        
        if reply in ("y", "yes", "1", "ok", "承認"):
            # 承認された場合
            print(f"[ApprovalManager] 承認されました!(試行回数: {request_data.attempt_count})")
            await ctx.yield_output(f"レポートが正常に公開されました (試行回数: {request_data.attempt_count})")
            return
        
        # 却下された場合、再試行
        next_attempt = request_data.attempt_count + 1
        print(f"[ApprovalManager] 却下されました。{next_attempt} 回目の承認をリクエストします...")
        
        await ctx.send_message(ApprovalRequest(
            prompt=f"レポート公開の承認をお願いします (y/n) - {next_attempt} 回目の試行",
            attempt_count=next_attempt
        ))

# ---------- ワークフロー構築 ----------
def build_workflow():
    """承認ワークフローを構築"""
    approval_manager = ApprovalManager(id="approval_manager")
    request_info = RequestInfoExecutor(id="request_info")
    
    workflow = (
        WorkflowBuilder()
        .set_start_executor(approval_manager)
        .add_edge(approval_manager, request_info)  # 承認リクエスト送信
        .add_edge(request_info, approval_manager)  # 承認レスポンス受信
        .build()
    )
    
    return workflow

# ---------- メイン実行 ----------
async def main():
    """メイン実行関数(公式サンプルのパターンに基づく)"""
    workflow = build_workflow()
    
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None
    
    while not completed:
        # 初回は run_stream、2回目以降は send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses) 
            if pending_responses 
            else workflow.run_stream("start")
        )
        
        # イベントを収集
        events = [event async for event in stream]
        pending_responses = None
        
        # リクエストと出力を収集
        requests: list[tuple[str, str, int]] = []  # (request_id, prompt, attempt_count)
        
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, ApprovalRequest):
                # 承認リクエストイベント
                requests.append((
                    event.request_id, 
                    event.data.prompt, 
                    event.data.attempt_count
                ))
            elif isinstance(event, WorkflowOutputEvent):
                # ワークフロー出力(完了)
                workflow_output = str(event.data)
                completed = True
        
        # 状態変化を表示(デバッグ用)
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )
        
        if pending_status:
            print("[状態] IN_PROGRESS_PENDING_REQUESTS (リクエスト処理中)")
        if idle_with_requests:
            print("[状態] IDLE_WITH_PENDING_REQUESTS (人間の入力待ち)")
        
        # 承認リクエストがある場合、ユーザーに入力を求める
        if requests and not completed:
            responses: dict[str, str] = {}
            
            for req_id, prompt, attempt_count in requests:
                print(f"\n{'='*60}")
                print(f"承認リクエスト (試行 {attempt_count})")
                print(f"{'='*60}")
                print(f"{prompt}")
                
                answer = input("承認しますか? (y/n/exit): ").strip().lower()
                
                if answer == "exit":
                    print("ユーザーによって処理が中断されました")
                    return
                
                responses[req_id] = answer
                print(f"入力結果: {'承認' if answer in ('y', 'yes', '1', 'ok', '承認') else '却下'}")
            
            pending_responses = responses
    
    # 最終結果を表示
    print(f"\n{'='*70}")
    print(f"最終結果: {workflow_output}")
    print(f"{'='*70}")
    
    # ワークフロー図を表示
    print(f"\n{'='*50}")
    print("ワークフロー図 (Mermaid)")
    print(f"{'='*50}")
    from agent_framework import WorkflowViz
    viz = WorkflowViz(workflow)
    print(viz.to_mermaid())

if __name__ == "__main__":
    asyncio.run(main())


# ---------- 出力 ----------
# [ApprovalManager] 承認プロセスを開始します
# [状態] IN_PROGRESS_PENDING_REQUESTS (リクエスト処理中)
# [状態] IDLE_WITH_PENDING_REQUESTS (人間の入力待ち)

# ============================================================
# 承認リクエスト (試行 1)
# ============================================================
# レポート公開の承認をお願いします (y/n)
# 承認しますか? (y/n/exit): n
# 入力結果: 却下
# [ApprovalManager] 試行 1: 応答='n'
# [ApprovalManager] 却下されました。2 回目の承認をリクエストします...
# [状態] IN_PROGRESS_PENDING_REQUESTS (リクエスト処理中)
# [状態] IDLE_WITH_PENDING_REQUESTS (人間の入力待ち)

# ============================================================
# 承認リクエスト (試行 2)
# ============================================================
# レポート公開の承認をお願いします (y/n) - 2 回目の試行
# 承認しますか? (y/n/exit): y
# 入力結果: 承認
# [ApprovalManager] 試行 2: 応答='y'
# [ApprovalManager] 承認されました!(試行回数: 2)

# ======================================================================
# 最終結果: レポートが正常に公開されました (試行回数: 2)
# ======================================================================

オブザーバビリティ

Agent Framework には、ワークフロー内部で「何が起きているか」を可視化するための標準オブザーバビリティ機構が備わっています。

結果だけでなく「なぜその経路を通り、どこに時間がかかったのか」まで追跡できるため、デバッグ・分析・監査などに有用です。

さらに内部では OpenTelemetry (OTel) に対応した Span が生成されており、そのまま Application Insights / Aspire / Datadog などへ転送してモニタリングできます。

image.png

可視化

Agent Framework はワークフローを構造的に保持しているため、Executor のつながり(グラフ構造)や実際の実行ルートを可視化することが容易にできます。
複雑なワークフローを開発・運用する際の、構造の理解や設計レビューがとてもスムーズになります。

コードイメージ

Agent Framework では WorkflowViz を使うことで、簡単に Mermaid 形式の図として出力できます。

from agent_framework import WorkflowBuilder
from agent_framework import WorkflowViz

# (省略)

# ---------- ワークフロー構築 ----------
builder = WorkflowBuilder()
builder.set_start_executor(start)
builder.add_fan_out_edges(start, [count_words, count_chars])
builder.add_fan_in_edges([count_words, count_chars], aggregate)
workflow = builder.build()

# ---------- 可視化 ----------
viz = WorkflowViz(workflow)
print(viz.to_mermaid())

Mermaid で出力した際は、Mermaid Live Editor などで可視化できます。

image.png

Workflow as Agent

通常、ワークフローは「複数の Executor をつないだ 1 つの処理のかたまり」です。
Agent Framework では、この ワークフロー全体を 1 つのエージェントとして再利用できる 仕組みが提供されています。
それが as_agent() です。

workflow_agent = (
    WorkflowBuilder()
        .set_start_executor(worker)
        .add_edge(worker, reviewer)   
        .build()
        .as_agent(name="work_review_pipeline")  # ← ここで1つの「エージェント」として扱える
)

つまり、「複雑な処理手順」をひとまとめにした専門家エージェントが作成できます。

ワークフローにエージェントを組み込む

ワークフローにエージェントを組み込みたい場合、add_edge()AgentProtocol を満たすエージェントをそのまま渡すだけで動いてくれます。この時、内部で自動的に AgentExecutor にラップされます。

writer_agent = chat_client.create_agent(...)
reviewer_agent = chat_client.create_agent(...)

builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)     # ← そのままエージェントを置ける
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()

AgentExecutor の役割
ワークフローに組み込まれたエージェントは、内部では AgentExecutor として扱われます。
このラッパーが入ることで、エージェントをワークフロー上でも一貫した「Executor」として扱えるようになります。

AgentExecutor が担う処理 説明
入力整形 str / ChatMessage / list[ChatMessage] を受理し、LLM が理解できる会話形式へ正規化
出力統一 エージェントの応答を AgentExecutorResponse に統一(executor_id / agent_run_response / full_conversation を保持)
イベント生成 ストリーミング:AgentRunUpdateEvent(逐次チャンク)
非ストリーミング:AgentRunEvent(完全応答)
文脈継承 full_conversation を自動で後続 Executor に引き継ぎ、会話文脈を途切れさせずに処理

つまり、エージェントをEdge に載せるだけでワークフローの Executor として扱えるのは、この AgentExecutor が裏側で入出力処理とコンテキスト継承を肩代わりしているためです。

さらに詳細な制御(入力変換ロジック/出力整形/ストリーミング制御など)が必要な場合は、Custom Agent Executor を独自実装して差し替えることも可能です。

概要

Agent Framework では、複数のエージェントを連携(オーケストレーション)させる仕組みとして ワークフロー を採用しています。

WorkflowBuilder().add_edge(...) を使えば 0 から自由に構築できますが、
よくある連携パターンについては 「オーケストレーションパターン(ビルディングブロック)」 として実装されています。

まずはオーケストレーションパターンで表現できる範囲を活用し、必要に応じてスクラッチで拡張することで、設計・実装コストを抑えられます。

現在(2025年10月 / Python SDK)提供されているオーケストレーションパターンは以下の 3 つです。

パターン 説明 ユースケース
Sequential 決められた順番でエージェントを直列につなぎ、結果を順送りに渡していく。 ステップバイステップ処理、ETL/分析パイプライン
Concurrent 同じ入力を複数エージェントに同時ブロードキャストし、結果を並列収集する。 並列分析、比較検討、モデルアンサンブル
Magentic Magentic-One ベース。進捗状況に応じて「次に誰を起用するか」を動的判断 探索的タスク、R&D、長尺・複雑な協調作業

Sequential

Sequential はもっとも直感的な構造で、「A → B → C…」と順番に処理を渡していく方式です。

image.png

from agent_framework import SequentialBuilder

workflow = (
    SequentialBuilder()
    .participants([summarizer_agent, translator_agent, qa_agent])  # ← 複数エージェントを直列接続
    .build()
)

async for event in workflow.run_streaming("..."):
    ...

特徴・メリット

特徴 説明
構築が簡単 .participants([...]) に並べるだけで成立
文脈共有 直列チェーン全体で共通の会話コンテキストを維持
出力が会話履歴 最終出力は「最後の応答単体」ではなく全体の対話履歴

WorkflowBuilder().add_edge(...) でも直列化できますが、最終出力が “最後の応答だけ” になります。
SequentialBuilder を使うと「どのエージェントがどう返したか」まで含む履歴がそのまま結果になるため、後続工程にも活用しやすくなります。

Concurrent

Concurrent は、1つの入力を複数エージェントに同時配信(fan-out)し、結果をまとめる(fan-in) 方式です。

image.png

from agent_framework import ConcurrentBuilder

workflow = (
    ConcurrentBuilder()
    .participants([agent_a, agent_b, agent_c])
    .build()
)

async for event  in workflow.run_streaming("..."):
    ...

内部で動いているしくみ

ConcurrentBuilder は、次の 2 つの内部コンポーネントを自動で差し込みます。

コンポーネント 役割
Dispatcher fan-out(全員に同じ入力)
Aggregator fan-in(各エージェントの最終応答をまとめて返す)

デフォルトの Aggregator は 「1ユーザープロンプト + 各エージェントの最終出力」list[ChatMessage] として返します。

例えば 3 つのエージェントを並列実行した場合は、以下の応答が得られます。

[ USER_PROMPT, Aの応答, Bの応答, Cの応答 ]

カスタム集約も可能(Custom Aggregator)

さらに Concurrent では、集約(fan-in)処理を担う Aggregator 自由に差し替えることもできます。

def summarize(results, ctx):
    """ 各エージェントの応答をまとめるカスタム集約関数 """
    replies = [
        next(m for m in reversed(r.agent_run_response.messages) if m.role == Role.ASSISTANT)
        for r in results
    ]
    merged_text = " | ".join(m.text for m in replies)
    return [ChatMessage(Role.ASSISTANT, author_name="custom_aggregator", text=merged_text)]


workflow = (
    ConcurrentBuilder()
    .participants([agent1, agent2, agent3])
    .with_aggregator(summarize) # Custom Aggretator として設定
    .build()
)

Custom Aggretator が返り値を返せば、それがワークフロー最終出力となります。

[ custom_aggregator の応答 ]

Magentic

Magentic は、AutoGen によって開発された Magentic-One をベースに設計された「動的オーケストレーションパターン」です。

Sequential / Concurrent が「ワークフローの構造を先に決めておく(=静的接続)」であるのに対して、Magentic は「状況を見ながら誰を次に起用するかを毎ターン判断する(=動的接続)」という特徴を持ちます。

そのため、単なるエージェントの配列実行ではなく、以下のような“マネジメント・ループ”を内部に持ちます:

  1. Facts / Plan を作成(タスク台帳生成)
  2. 進捗評価(Progress Ledger)
  3. 次に動かすエージェントを選抜
  4. 必要なら Replan → 1 に戻る(完了で終了)

つまり、人間のマネージャーがタスクの進捗に応じて「今この人を動かすべき」と判断する構造に近いモデルを実現しています。

image.png

from agent_framework import MagenticBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential

# 参加者(=起用候補となるエージェント一覧)
participants = {
    "researcher": researcher_agent,
    "writer": writer_agent,
    "reviewer": reviewer_agent,
}

workflow = (
    MagenticBuilder()
    .participants(**participants)
    .with_standard_manager(chat_client=AzureChatClient(...))  # ← Manager(司令塔)を注入
    .build()
)

async for event in workflow.run_streaming("気候変動と日本経済への影響を要約してレポート化して"):
    ...

標準マネージャーの挙動をカスタマイズ(プロンプト単位でカスタム可能)

標準マネージャー(with_standard_manager(...) )には、Magentic の各フェーズ(台帳生成・進捗評価・最終回答など)の 挙動を制御するためのプロンプト がデフォルトで設定されています。

開発者はこれらをオーバーライドすることで「マネージャーの思考」をカスタマイズできます。

パラメータ 内容 フェーズ カスタマイズ例
instructions Manager へのシステム指示 全体 品質・禁止事項
task_ledger_* Facts/Plan 管理 計画系 証拠ベース強化
progress_ledger_prompt 誰を起用するか 調整 指名ルール
final_answer_prompt 最終出力方針 完了 体裁・形式

Human-in-the-Loop にも標準対応

Magentic は計画(Plan)の段階に 人間承認ステップ(Human-in-the-Loop) を挟めます。

workflow = (
    MagenticBuilder()
    .participants(**participants)
    .with_standard_manager(chat_client=AzureChatClient(...))
    .with_plan_review(True)  # ← Human-in-the-Loop を有効化
    .build()
)

この Human-in-the-Loop によって

  • 誤った前提で進行することを事前に防止
  • 曖昧な要件を人間判断で補完
  • 「最終フェーズで崩れる」事故を低減

結果として、プロセス全体の品質・透明性・成功率が向上します。

Agent Framework では、ローカルで開発したエージェント/ワークフローを ブラウザ上の専用 UI(Dev UI) で可視化・テストできます。

image.png

image.png

Semantic Kernel や AutoGen で既存実装がある場合は、以下の公式ガイドを基に移行計画を立てられます。

なお、2025年10月時点で Microsoft Agent Framework は Preview 段階のため、GA(一般提供)に向けて破壊的変更の可能性があります。プロダクション移行は GA 後を推奨します。




Source link

関連記事

コメント

この記事へのコメントはありません。