ブログ記事

AI Agent開発実践ガイド2025 - LangChain/LangGraphで作る自律型エージェント

LangChainとLangGraphを使用した自律型AIエージェントの設計・実装方法を徹底解説。Function Calling、マルチエージェントシステム、メモリ管理など、2025年最新の実践的なテクニックを豊富なコード例とともに紹介します。

17分で読めます
R
Rina
Daily Hack 編集長
AI・機械学習
AI Agent LangChain LangGraph Python 自律型エージェント
AI Agent開発実践ガイド2025 - LangChain/LangGraphで作る自律型エージェントのヒーロー画像

2025 年、AI エージェントの実用化が急速に進んでいます。単純なチャットボットを超えて、複雑なタスクを自律的に実行できる AI エージェントは、ビジネスプロセスの自動化に革命をもたらしています。本記事では、LangChainと LangGraph を活用した実践的な AI エージェント開発手法を、豊富な実装例とともに解説します。

この記事で学べること

  • LangChain/LangGraph の基本概念と使い分け
  • Function Calling を活用した自律的なタスク実行
  • マルチエージェントシステムの設計と実装
  • メモリ管理と状態管理の実践手法
  • 本番環境でのコスト最適化とパフォーマンスチューニング

目次

  1. AI エージェントの進化と 2025 年の展望
  2. LangChainと LangGraph の基本
  3. 環境構築とセットアップ
  4. 基本的なエージェントの実装
  5. Function Calling とツール統合
  6. 高度な実装:マルチエージェントシステム
  7. メモリ管理と状態永続化
  8. パフォーマンス最適化
  9. 実践例:カスタマーサポートエージェント
  10. まとめとベストプラクティス

AIエージェントの進化と2025年の展望

AI エージェントは、単なる質問応答システムから、複雑なタスクを自律的に実行できるシステムへと進化しています。2025 年現在、以下のような特徴を持つエージェントが実用化されています:

2025年のAIエージェントの主要特徴
特徴 説明 活用例
自律的タスク実行 人間の介入なしに複数ステップのタスクを完遂 データ分析、レポート作成
コンテキスト理解 長期的な会話履歴と状態を保持 カスタマーサポート、個人秘書
ツール統合 外部API・DBとの連携 リアルタイムデータ取得、自動化
協調動作 複数エージェントの連携 複雑な問題解決、分散処理

LangChainとLangGraphの基本

アーキテクチャの違い

LangChainとLangGraphのアーキテクチャ比較

チャートを読み込み中...

使い分けのガイドライン

# LangChain: 直線的な処理フロー
from langchain import LLMChain

chain = LLMChain(
    llm=llm,
    prompt=prompt,
    memory=memory
)

result = chain.run("タスクを実行")
# LangGraph: 状態を持つ複雑なフロー
from langgraph import StateGraph

workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_task)
workflow.add_node("execute", execute_task)
workflow.add_edge("analyze", "execute")

app = workflow.compile()
result = app.invoke({"task": "複雑なタスク"})
LangChain: シンプルなチェーン
# LangChain: 直線的な処理フロー
from langchain import LLMChain

chain = LLMChain(
    llm=llm,
    prompt=prompt,
    memory=memory
)

result = chain.run("タスクを実行")
LangGraph: 複雑なワークフロー
# LangGraph: 状態を持つ複雑なフロー
from langgraph import StateGraph

workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_task)
workflow.add_node("execute", execute_task)
workflow.add_edge("analyze", "execute")

app = workflow.compile()
result = app.invoke({"task": "複雑なタスク"})

環境構築とセットアップ

まず、必要なライブラリをインストールします:

# 基本的なインストール
pip install langchain langgraph langchain-openai langchain-anthropic

# 追加ツール
pip install chromadb faiss-cpu redis python-dotenv

# 開発ツール
pip install jupyter notebook pytest black

環境変数の設定

# .env ファイル
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
REDIS_URL=redis://localhost:6379
LANGSMITH_API_KEY=your_langsmith_key
LANGSMITH_PROJECT=my-agent-project

基本的なエージェントの実装

シンプルなReActエージェント

from langchain.agents import create_react_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain.prompts import PromptTemplate
import os
from dotenv import load_dotenv

load_dotenv()

# LLMの初期化
llm = ChatOpenAI(
    model="gpt-4-turbo",
    temperature=0,
    streaming=True
)

# ツールの定義
def search_database(query: str) -> str:
    """データベースから情報を検索"""
    # 実際のDB検索ロジック
    return f"検索結果: {query}に関する情報"

def calculate(expression: str) -> str:
    """数式を計算"""
    try:
        result = eval(expression)
        return f"計算結果: {result}"
    except:
        return "計算エラー"

tools = [
    Tool(
        name="DatabaseSearch",
        func=search_database,
        description="データベースから情報を検索します"
    ),
    Tool(
        name="Calculator",
        func=calculate,
        description="数式を計算します"
    )
]

# プロンプトテンプレート
prompt = PromptTemplate.from_template("""
あなたは有能なアシスタントです。与えられたタスクを実行するために、
利用可能なツールを適切に使用してください。

利用可能なツール:
{tools}

ツールの使用形式:
Action: ツール名
Action Input: 入力値

タスク: {input}
思考過程: {agent_scratchpad}
""")

# エージェントの作成
agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt=prompt
)

# エージェントの実行
from langchain.agents import AgentExecutor

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=5
)

# 実行例
result = agent_executor.invoke({
    "input": "データベースから2024年の売上データを検索し、前年比を計算してください"
})

print(result["output"])

Function Callingとツール統合

高度なFunction Calling実装

from typing import List, Dict, Any
from pydantic import BaseModel, Field
from langchain.tools import StructuredTool
from langchain_openai import ChatOpenAI
import json
import requests

# 構造化された入力の定義
class EmailParams(BaseModel):
    recipient: str = Field(description="メールの宛先")
    subject: str = Field(description="メールの件名")
    body: str = Field(description="メール本文")
    attachments: List[str] = Field(default=[], description="添付ファイルのパス")

class DatabaseQuery(BaseModel):
    table: str = Field(description="検索対象のテーブル名")
    filters: Dict[str, Any] = Field(description="検索条件")
    limit: int = Field(default=10, description="取得件数")

# ツール関数の実装
async def send_email(params: EmailParams) -> str:
    """メールを送信する"""
    # 実際のメール送信ロジック
    try:
        # SendGrid, AWS SES等のAPIを呼び出す
        response = requests.post(
            "https://api.sendgrid.com/v3/mail/send",
            headers={
                "Authorization": f"Bearer {os.getenv('SENDGRID_API_KEY')}",
                "Content-Type": "application/json"
            },
            json={
                "personalizations": [{
                    "to": [{"email": params.recipient}]
                }],
                "from": {"email": "agent@example.com"},
                "subject": params.subject,
                "content": [{
                    "type": "text/plain",
                    "value": params.body
                }]
            }
        )
        return f"メール送信成功: {params.recipient}"
    except Exception as e:
        return f"メール送信失敗: {str(e)}"

async def query_database(params: DatabaseQuery) -> str:
    """データベースをクエリする"""
    # 実際のDB接続とクエリ実行
    import sqlite3
    
    conn = sqlite3.connect('company.db')
    cursor = conn.cursor()
    
    # 動的クエリ構築(実際はSQLインジェクション対策が必要)
    where_clause = " AND ".join([f"{k} = ?" for k in params.filters.keys()])
    query = f"SELECT * FROM {params.table}"
    if where_clause:
        query += f" WHERE {where_clause}"
    query += f" LIMIT {params.limit}"
    
    cursor.execute(query, list(params.filters.values()))
    results = cursor.fetchall()
    conn.close()
    
    return json.dumps(results, ensure_ascii=False)

# 構造化ツールの作成
email_tool = StructuredTool.from_function(
    func=send_email,
    name="send_email",
    description="メールを送信します",
    args_schema=EmailParams,
    coroutine=send_email
)

db_tool = StructuredTool.from_function(
    func=query_database,
    name="query_database",
    description="データベースから情報を取得します",
    args_schema=DatabaseQuery,
    coroutine=query_database
)

# Function Calling対応のLLM
llm = ChatOpenAI(
    model="gpt-4-1106-preview",
    temperature=0
).bind_tools([email_tool, db_tool])

高度な実装:マルチエージェントシステム

LangGraphを使用した協調エージェント

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Sequence
from operator import add
import asyncio

# 状態の定義
class AgentState(TypedDict):
    messages: Annotated[Sequence[str], add]
    task: str
    subtasks: List[str]
    results: Dict[str, Any]
    current_agent: str
    completed: bool

# 専門エージェントの定義
class ResearchAgent:
    """調査を担当するエージェント"""
    
    def __init__(self, llm):
        self.llm = llm
        self.tools = [
            Tool(name="web_search", func=self.web_search, description="Web検索"),
            Tool(name="arxiv_search", func=self.arxiv_search, description="論文検索")
        ]
    
    async def web_search(self, query: str) -> str:
        # 実際のWeb検索API呼び出し
        return f"Web検索結果: {query}"
    
    async def arxiv_search(self, query: str) -> str:
        # arXiv API呼び出し
        return f"論文検索結果: {query}"
    
    async def process(self, state: AgentState) -> AgentState:
        """調査タスクを実行"""
        task = state["task"]
        
        # LLMにツール使用を判断させる
        response = await self.llm.ainvoke(
            f"タスク: {task}\n利用可能なツール: {self.tools}\n適切なツールを使って調査してください。"
        )
        
        # 結果を状態に追加
        state["results"]["research"] = response
        state["messages"].append(f"調査完了: {response}")
        
        return state

class AnalysisAgent:
    """分析を担当するエージェント"""
    
    def __init__(self, llm):
        self.llm = llm
    
    async def process(self, state: AgentState) -> AgentState:
        """調査結果を分析"""
        research_results = state["results"].get("research", "")
        
        analysis = await self.llm.ainvoke(
            f"以下の調査結果を分析してください:\n{research_results}"
        )
        
        state["results"]["analysis"] = analysis
        state["messages"].append(f"分析完了: {analysis}")
        
        return state

class ReportAgent:
    """レポート作成を担当するエージェント"""
    
    def __init__(self, llm):
        self.llm = llm
    
    async def process(self, state: AgentState) -> AgentState:
        """最終レポートを作成"""
        results = state["results"]
        
        report = await self.llm.ainvoke(
            f"""以下の情報を基に包括的なレポートを作成してください:
            調査結果: {results.get('research', '')}
            分析結果: {results.get('analysis', '')}
            """
        )
        
        state["results"]["report"] = report
        state["messages"].append(f"レポート作成完了: {report}")
        state["completed"] = True
        
        return state

# マルチエージェントワークフローの構築
class MultiAgentSystem:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4-turbo")
        self.research_agent = ResearchAgent(self.llm)
        self.analysis_agent = AnalysisAgent(self.llm)
        self.report_agent = ReportAgent(self.llm)
        
        # ワークフローグラフの構築
        self.workflow = StateGraph(AgentState)
        
        # ノードの追加
        self.workflow.add_node("research", self.research_agent.process)
        self.workflow.add_node("analysis", self.analysis_agent.process)
        self.workflow.add_node("report", self.report_agent.process)
        
        # エッジの定義
        self.workflow.add_edge("research", "analysis")
        self.workflow.add_edge("analysis", "report")
        self.workflow.add_edge("report", END)
        
        # エントリーポイント
        self.workflow.set_entry_point("research")
        
        # コンパイル
        self.app = self.workflow.compile()
    
    async def run(self, task: str) -> Dict[str, Any]:
        """マルチエージェントシステムを実行"""
        initial_state = {
            "messages": [f"タスク開始: {task}"],
            "task": task,
            "subtasks": [],
            "results": {},
            "current_agent": "research",
            "completed": False
        }
        
        final_state = await self.app.ainvoke(initial_state)
        return final_state["results"]["report"]

# 使用例
async def main():
    system = MultiAgentSystem()
    report = await system.run(
        "AI技術の最新トレンドについて調査し、ビジネスへの影響を分析してレポートを作成してください"
    )
    print(report)

# 実行
asyncio.run(main())

メモリ管理と状態永続化

高度なメモリシステムの実装

from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import BaseMemory
from typing import Dict, Any, List
import redis
import json
from datetime import datetime

class HybridMemory(BaseMemory):
    """短期記憶と長期記憶を組み合わせたハイブリッドメモリ"""
    
    def __init__(
        self, 
        redis_url: str,
        session_id: str,
        llm: Any,
        max_short_term_size: int = 10,
        summary_threshold: int = 5
    ):
        self.redis_client = redis.from_url(redis_url)
        self.session_id = session_id
        self.llm = llm
        self.max_short_term_size = max_short_term_size
        self.summary_threshold = summary_threshold
        
        # 短期記憶(最近の会話)
        self.short_term = []
        
        # 長期記憶(要約)
        self.long_term_key = f"memory:long_term:{session_id}"
        
        # セッション情報
        self.session_key = f"session:{session_id}"
        
    @property
    def memory_variables(self) -> List[str]:
        return ["history", "context"]
    
    def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """メモリから情報を読み込む"""
        # 長期記憶を取得
        long_term = self.redis_client.get(self.long_term_key)
        long_term_summary = long_term.decode() if long_term else ""
        
        # 短期記憶を文字列化
        short_term_text = "\n".join(self.short_term)
        
        # コンテキスト情報を取得
        context = self._get_context_info()
        
        return {
            "history": f"長期記憶:\n{long_term_summary}\n\n最近の会話:\n{short_term_text}",
            "context": context
        }
    
    def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
        """会話コンテキストを保存"""
        # 短期記憶に追加
        timestamp = datetime.now().isoformat()
        conversation = f"[{timestamp}] Human: {inputs.get('input', '')}\nAI: {outputs.get('output', '')}"
        self.short_term.append(conversation)
        
        # 短期記憶のサイズチェック
        if len(self.short_term) > self.max_short_term_size:
            self._compress_to_long_term()
        
        # セッション情報を更新
        self._update_session_info(inputs, outputs)
    
    def _compress_to_long_term(self):
        """短期記憶を要約して長期記憶に移動"""
        if len(self.short_term) < self.summary_threshold:
            return
        
        # 要約対象の会話を取得
        to_summarize = self.short_term[:self.summary_threshold]
        conversations = "\n".join(to_summarize)
        
        # LLMで要約
        summary_prompt = f"""以下の会話を要約してください。重要な情報は保持してください:
        
        {conversations}
        
        要約:"""
        
        summary = self.llm.predict(summary_prompt)
        
        # 既存の長期記憶と結合
        existing_long_term = self.redis_client.get(self.long_term_key)
        if existing_long_term:
            combined = f"{existing_long_term.decode()}\n\n{summary}"
        else:
            combined = summary
        
        # 保存
        self.redis_client.set(self.long_term_key, combined)
        
        # 短期記憶から削除
        self.short_term = self.short_term[self.summary_threshold:]
    
    def _get_context_info(self) -> str:
        """セッションのコンテキスト情報を取得"""
        session_data = self.redis_client.hgetall(self.session_key)
        if not session_data:
            return ""
        
        context = []
        for key, value in session_data.items():
            context.append(f"{key.decode()}: {value.decode()}")
        
        return "\n".join(context)
    
    def _update_session_info(self, inputs: Dict[str, Any], outputs: Dict[str, str]):
        """セッション情報を更新"""
        self.redis_client.hset(
            self.session_key,
            mapping={
                "last_interaction": datetime.now().isoformat(),
                "interaction_count": self.redis_client.hincrby(self.session_key, "interaction_count", 1)
            }
        )
    
    def clear(self) -> None:
        """メモリをクリア"""
        self.short_term = []
        self.redis_client.delete(self.long_term_key)
        self.redis_client.delete(self.session_key)

# 使用例
hybrid_memory = HybridMemory(
    redis_url="redis://localhost:6379",
    session_id="user_123",
    llm=ChatOpenAI(),
    max_short_term_size=20,
    summary_threshold=10
)

# エージェントに統合
agent_with_memory = create_react_agent(
    llm=llm,
    tools=tools,
    prompt=prompt,
    memory=hybrid_memory
)

パフォーマンス最適化

コスト削減とレスポンス改善のテクニック

パフォーマンス最適化手法の比較
最適化手法 削減効果 実装難易度 適用場面
プロンプトキャッシング 30-50% 繰り返しクエリ
モデル階層化 40-60% 複雑度に応じた処理
バッチ処理 20-30% 大量処理
ストリーミング UX改善 リアルタイム応答
セマンティックキャッシュ 50-70% 類似クエリ

実装例:インテリジェントキャッシング

from functools import lru_cache
import hashlib
from typing import Optional
import numpy as np
from sentence_transformers import SentenceTransformer

class SemanticCache:
    """セマンティック類似性に基づくキャッシュシステム"""
    
    def __init__(self, similarity_threshold: float = 0.85):
        self.cache = {}
        self.embeddings = {}
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.similarity_threshold = similarity_threshold
    
    def _get_embedding(self, text: str) -> np.ndarray:
        """テキストの埋め込みを取得"""
        return self.model.encode(text)
    
    def _calculate_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
        """コサイン類似度を計算"""
        return np.dot(embedding1, embedding2) / (np.linalg.norm(embedding1) * np.linalg.norm(embedding2))
    
    def get(self, query: str) -> Optional[str]:
        """キャッシュから類似クエリの結果を取得"""
        query_embedding = self._get_embedding(query)
        
        for cached_query, cached_embedding in self.embeddings.items():
            similarity = self._calculate_similarity(query_embedding, cached_embedding)
            
            if similarity >= self.similarity_threshold:
                print(f"キャッシュヒット! 類似度: {similarity:.2f}")
                return self.cache[cached_query]
        
        return None
    
    def set(self, query: str, result: str):
        """クエリと結果をキャッシュに保存"""
        embedding = self._get_embedding(query)
        self.embeddings[query] = embedding
        self.cache[query] = result

# モデル階層化システム
class ModelHierarchy:
    """複雑度に応じて適切なモデルを選択"""
    
    def __init__(self):
        self.models = {
            "simple": ChatOpenAI(model="gpt-3.5-turbo", temperature=0),
            "medium": ChatOpenAI(model="gpt-4", temperature=0),
            "complex": ChatOpenAI(model="gpt-4-turbo", temperature=0)
        }
        self.classifier = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
    
    def classify_complexity(self, task: str) -> str:
        """タスクの複雑度を分類"""
        prompt = f"""以下のタスクの複雑度を評価してください:
        
        タスク: {task}
        
        分類:
        - simple: 基本的な質問応答、簡単な計算
        - medium: 複数ステップの推論、中程度の分析
        - complex: 高度な分析、創造的タスク、専門知識が必要
        
        回答は'simple'、'medium'、'complex'のいずれかで:"""
        
        response = self.classifier.predict(prompt).strip().lower()
        return response if response in self.models else "medium"
    
    def process(self, task: str) -> str:
        """タスクを適切なモデルで処理"""
        complexity = self.classify_complexity(task)
        model = self.models[complexity]
        
        print(f"タスク複雑度: {complexity} - {model.model_name}を使用")
        return model.predict(task)

# 最適化されたエージェント
class OptimizedAgent:
    def __init__(self):
        self.semantic_cache = SemanticCache()
        self.model_hierarchy = ModelHierarchy()
        self.batch_queue = []
        self.batch_size = 5
    
    async def process_request(self, request: str) -> str:
        """最適化された処理"""
        # 1. キャッシュチェック
        cached_result = self.semantic_cache.get(request)
        if cached_result:
            return cached_result
        
        # 2. モデル選択と処理
        result = self.model_hierarchy.process(request)
        
        # 3. キャッシュ保存
        self.semantic_cache.set(request, result)
        
        return result
    
    async def process_batch(self, requests: List[str]) -> List[str]:
        """バッチ処理で効率化"""
        # 類似リクエストをグループ化
        groups = self._group_similar_requests(requests)
        results = []
        
        for group in groups:
            # グループごとに最適なモデルで処理
            complexity = self.model_hierarchy.classify_complexity(group[0])
            model = self.model_hierarchy.models[complexity]
            
            # バッチ処理
            batch_results = await model.abatch(group)
            results.extend(batch_results)
        
        return results
    
    def _group_similar_requests(self, requests: List[str]) -> List[List[str]]:
        """類似リクエストをグループ化"""
        # 実装は省略(埋め込みベースのクラスタリング)
        return [requests]  # 簡略化
コスト削減率 90 %

実践例:カスタマーサポートエージェント

完全なカスタマーサポートシステムの実装

from typing import Dict, List, Optional
from datetime import datetime
import asyncio
from enum import Enum

class TicketPriority(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    URGENT = "urgent"

class CustomerSupportAgent:
    """高度なカスタマーサポートエージェント"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4-turbo")
        self.memory = HybridMemory(
            redis_url="redis://localhost:6379",
            session_id="support_agent",
            llm=self.llm
        )
        self.tools = self._setup_tools()
        self.workflow = self._setup_workflow()
    
    def _setup_tools(self) -> List[Tool]:
        """ツールのセットアップ"""
        return [
            StructuredTool.from_function(
                func=self.search_knowledge_base,
                name="search_kb",
                description="社内ナレッジベースを検索"
            ),
            StructuredTool.from_function(
                func=self.check_order_status,
                name="check_order",
                description="注文状況を確認"
            ),
            StructuredTool.from_function(
                func=self.create_ticket,
                name="create_ticket",
                description="サポートチケットを作成"
            ),
            StructuredTool.from_function(
                func=self.escalate_to_human,
                name="escalate",
                description="人間のオペレーターにエスカレーション"
            )
        ]
    
    def _setup_workflow(self) -> StateGraph:
        """ワークフローの構築"""
        workflow = StateGraph(Dict)
        
        # ノード定義
        workflow.add_node("classify", self.classify_inquiry)
        workflow.add_node("search", self.search_solution)
        workflow.add_node("respond", self.generate_response)
        workflow.add_node("followup", self.check_satisfaction)
        
        # 条件付きエッジ
        workflow.add_conditional_edges(
            "classify",
            self.route_based_on_classification,
            {
                "simple": "search",
                "complex": "escalate",
                "order": "check_order"
            }
        )
        
        return workflow.compile()
    
    async def classify_inquiry(self, state: Dict) -> Dict:
        """問い合わせを分類"""
        inquiry = state["inquiry"]
        
        classification_prompt = f"""
        以下の問い合わせを分類してください:
        
        問い合わせ: {inquiry}
        
        カテゴリ:
        - simple: FAQ、一般的な質問
        - complex: 技術的問題、複雑なケース
        - order: 注文関連
        - complaint: クレーム
        
        優先度も判定してください(low/medium/high/urgent)
        
        JSON形式で回答:
        """
        
        response = await self.llm.ainvoke(classification_prompt)
        classification = json.loads(response.content)
        
        state["classification"] = classification["category"]
        state["priority"] = classification["priority"]
        
        return state
    
    async def search_solution(self, state: Dict) -> Dict:
        """解決策を検索"""
        inquiry = state["inquiry"]
        
        # ナレッジベース検索
        kb_results = await self.search_knowledge_base(inquiry)
        
        # 類似の過去事例を検索
        similar_cases = await self.find_similar_cases(inquiry)
        
        state["kb_results"] = kb_results
        state["similar_cases"] = similar_cases
        
        return state
    
    async def generate_response(self, state: Dict) -> Dict:
        """応答を生成"""
        context = {
            "inquiry": state["inquiry"],
            "kb_results": state.get("kb_results", ""),
            "similar_cases": state.get("similar_cases", ""),
            "customer_history": state.get("customer_history", "")
        }
        
        response_prompt = f"""
        以下の情報を基に、顧客への丁寧で有益な応答を生成してください:
        
        問い合わせ: {context['inquiry']}
        
        ナレッジベース: {context['kb_results']}
        
        類似事例: {context['similar_cases']}
        
        応答:
        """
        
        response = await self.llm.ainvoke(response_prompt)
        state["response"] = response.content
        
        return state
    
    async def check_satisfaction(self, state: Dict) -> Dict:
        """顧客満足度をチェック"""
        # 実際は顧客からのフィードバックを待つ
        state["satisfaction_check"] = True
        return state
    
    def route_based_on_classification(self, state: Dict) -> str:
        """分類に基づいてルーティング"""
        classification = state["classification"]
        priority = state["priority"]
        
        if priority == "urgent" or classification == "complaint":
            return "escalate"
        elif classification == "order":
            return "check_order"
        else:
            return "search"
    
    async def search_knowledge_base(self, query: str) -> str:
        """ナレッジベースを検索(ベクトルDB使用)"""
        # ChromaDBやPineconeを使用した実装
        from chromadb import Client
        
        client = Client()
        collection = client.get_collection("support_kb")
        
        results = collection.query(
            query_texts=[query],
            n_results=3
        )
        
        return "\n".join(results["documents"][0])
    
    async def check_order_status(self, order_id: str) -> Dict:
        """注文状況を確認"""
        # 実際のAPI呼び出し
        return {
            "order_id": order_id,
            "status": "配送中",
            "estimated_delivery": "2025-06-22",
            "tracking_number": "JP123456789"
        }
    
    async def create_ticket(self, **kwargs) -> str:
        """サポートチケットを作成"""
        ticket = {
            "id": f"TICKET-{datetime.now().strftime('%Y%m%d%H%M%S')}",
            "created_at": datetime.now().isoformat(),
            **kwargs
        }
        
        # データベースに保存
        # ...
        
        return ticket["id"]
    
    async def escalate_to_human(self, state: Dict) -> Dict:
        """人間のオペレーターにエスカレーション"""
        ticket_id = await self.create_ticket(
            inquiry=state["inquiry"],
            priority=state["priority"],
            classification=state["classification"],
            ai_attempted=True
        )
        
        state["escalated"] = True
        state["ticket_id"] = ticket_id
        state["response"] = f"""申し訳ございません。この問題は専門スタッフが対応させていただきます。
        
        チケット番号: {ticket_id}
        優先度: {state['priority']}
        
        まもなく担当者からご連絡させていただきます。"""
        
        return state
    
    async def handle_inquiry(self, inquiry: str, customer_id: Optional[str] = None) -> str:
        """問い合わせを処理"""
        initial_state = {
            "inquiry": inquiry,
            "customer_id": customer_id,
            "timestamp": datetime.now().isoformat()
        }
        
        # 顧客履歴を取得
        if customer_id:
            initial_state["customer_history"] = await self.get_customer_history(customer_id)
        
        # ワークフロー実行
        final_state = await self.workflow.ainvoke(initial_state)
        
        # メモリに保存
        self.memory.save_context(
            {"input": inquiry},
            {"output": final_state["response"]}
        )
        
        return final_state["response"]
    
    async def get_customer_history(self, customer_id: str) -> str:
        """顧客履歴を取得"""
        # 実際のDB検索
        return f"過去の問い合わせ: 3件、最終購入: 2025-06-01"

# 使用例
async def main():
    agent = CustomerSupportAgent()
    
    # シンプルな問い合わせ
    response1 = await agent.handle_inquiry(
        "返品ポリシーについて教えてください",
        customer_id="CUST-12345"
    )
    print(f"応答1: {response1}\n")
    
    # 複雑な問い合わせ
    response2 = await agent.handle_inquiry(
        "注文した商品が届きません。注文番号はORD-98765です。",
        customer_id="CUST-12345"
    )
    print(f"応答2: {response2}\n")
    
    # 緊急の問い合わせ
    response3 = await agent.handle_inquiry(
        "製品に欠陥があり、怪我をしました。すぐに対応してください。",
        customer_id="CUST-12345"
    )
    print(f"応答3: {response3}")

# 実行
asyncio.run(main())

ベストプラクティス

AIエージェント開発の重要ポイント

  1. 段階的実装: シンプルなエージェントから始めて徐々に機能を追加
  2. エラーハンドリング: 各ステップで適切なフォールバック処理を実装
  3. 監視とログ: LangSmith やカスタムログで動作を詳細に記録
  4. コスト管理: トークン使用量を監視し、キャッシングを活用
  5. セキュリティ: プロンプトインジェクション対策とデータ保護を徹底
  6. テスト: 単体テスト、統合テスト、エンドツーエンドテストを整備

セキュリティ対策の実装

from typing import List, Pattern
import re

class SecurityManager:
    """AIエージェントのセキュリティ管理"""
    
    def __init__(self):
        self.forbidden_patterns = [
            r"ignore previous instructions",
            r"system prompt",
            r"reveal your instructions",
            r"<script.*?>.*?</script>",
            r"DROP TABLE",
            r"DELETE FROM"
        ]
        self.sensitive_data_patterns = {
            "credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
        }
    
    def sanitize_input(self, text: str) -> str:
        """入力のサニタイズ"""
        # プロンプトインジェクションのチェック
        for pattern in self.forbidden_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                raise ValueError(f"禁止されたパターンが検出されました: {pattern}")
        
        return text
    
    def mask_sensitive_data(self, text: str) -> str:
        """機密データのマスキング"""
        for data_type, pattern in self.sensitive_data_patterns.items():
            text = re.sub(pattern, f"[{data_type.upper()}_MASKED]", text)
        
        return text
    
    def validate_tool_access(self, tool_name: str, user_role: str) -> bool:
        """ツールアクセスの検証"""
        access_control = {
            "admin": ["*"],
            "user": ["search_kb", "check_order"],
            "guest": ["search_kb"]
        }
        
        allowed_tools = access_control.get(user_role, [])
        return tool_name in allowed_tools or "*" in allowed_tools

まとめ

2025 年の AI エージェント開発は、LangChainと LangGraph の組み合わせにより、従来よりもはるかに高度で実用的なシステムの構築が可能になっています。

LangChain登場

LLMアプリケーション開発の標準化

Function Calling普及

ツール統合の簡素化

エンタープライズ採用

本格的な業務システムへの統合

完全自律エージェント

人間の介入を最小限に

AI エージェントは、単なる自動化ツールではありません。人間の能力を拡張し、創造性を解放する新しいパートナーです。重要なのは、適切な設計と責任ある実装です。

Andrew Ng AI研究者・教育者

本記事で紹介した技術とベストプラクティスを活用することで、実用的で高性能な AI エージェントを構築できます。継続的な改善と最新技術の採用により、より洗練されたシステムへと進化させていくことが重要です。

Rinaのプロフィール画像

Rina

Daily Hack 編集長

フルスタックエンジニアとして10年以上の経験を持つ。 大手IT企業やスタートアップでの開発経験を活かし、 実践的で即効性のある技術情報を日々発信中。 特にWeb開発、クラウド技術、AI活用に精通。

この記事は役に立ちましたか?

あなたのフィードバックが記事の改善に役立ちます

この記事は役に立ちましたか?

Daily Hackでは、開発者の皆様に役立つ情報を毎日発信しています。