LangGraph実践ガイド 2025 - 複雑なAIワークフローを構築する最新手法
LangGraphを使った高度なAIエージェントとワークフローの構築方法を徹底解説。ステートフルなマルチエージェントシステム、Human-in-the-Loop、本番環境へのデプロイまで、実践的なコード例と共に紹介します。
LangChainとLangGraphを使用した自律型AIエージェントの設計・実装方法を徹底解説。Function Calling、マルチエージェントシステム、メモリ管理など、2025年最新の実践的なテクニックを豊富なコード例とともに紹介します。
2025 年、AI エージェントの実用化が急速に進んでいます。単純なチャットボットを超えて、複雑なタスクを自律的に実行できる AI エージェントは、ビジネスプロセスの自動化に革命をもたらしています。本記事では、LangChainと LangGraph を活用した実践的な AI エージェント開発手法を、豊富な実装例とともに解説します。
AI エージェントは、単なる質問応答システムから、複雑なタスクを自律的に実行できるシステムへと進化しています。2025 年現在、以下のような特徴を持つエージェントが実用化されています:
特徴 | 説明 | 活用例 |
---|---|---|
自律的タスク実行 | 人間の介入なしに複数ステップのタスクを完遂 | データ分析、レポート作成 |
コンテキスト理解 | 長期的な会話履歴と状態を保持 | カスタマーサポート、個人秘書 |
ツール統合 | 外部API・DBとの連携 | リアルタイムデータ取得、自動化 |
協調動作 | 複数エージェントの連携 | 複雑な問題解決、分散処理 |
チャートを読み込み中...
# LangChain: 直線的な処理フロー
from langchain import LLMChain
chain = LLMChain(
llm=llm,
prompt=prompt,
memory=memory
)
result = chain.run("タスクを実行")
# LangGraph: 状態を持つ複雑なフロー
from langgraph import StateGraph
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_task)
workflow.add_node("execute", execute_task)
workflow.add_edge("analyze", "execute")
app = workflow.compile()
result = app.invoke({"task": "複雑なタスク"})
# LangChain: 直線的な処理フロー
from langchain import LLMChain
chain = LLMChain(
llm=llm,
prompt=prompt,
memory=memory
)
result = chain.run("タスクを実行")
# LangGraph: 状態を持つ複雑なフロー
from langgraph import StateGraph
workflow = StateGraph(AgentState)
workflow.add_node("analyze", analyze_task)
workflow.add_node("execute", execute_task)
workflow.add_edge("analyze", "execute")
app = workflow.compile()
result = app.invoke({"task": "複雑なタスク"})
まず、必要なライブラリをインストールします:
# 基本的なインストール
pip install langchain langgraph langchain-openai langchain-anthropic
# 追加ツール
pip install chromadb faiss-cpu redis python-dotenv
# 開発ツール
pip install jupyter notebook pytest black
# .env ファイル
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
REDIS_URL=redis://localhost:6379
LANGSMITH_API_KEY=your_langsmith_key
LANGSMITH_PROJECT=my-agent-project
from langchain.agents import create_react_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain.prompts import PromptTemplate
import os
from dotenv import load_dotenv
load_dotenv()
# LLMの初期化
llm = ChatOpenAI(
model="gpt-4-turbo",
temperature=0,
streaming=True
)
# ツールの定義
def search_database(query: str) -> str:
"""データベースから情報を検索"""
# 実際のDB検索ロジック
return f"検索結果: {query}に関する情報"
def calculate(expression: str) -> str:
"""数式を計算"""
try:
result = eval(expression)
return f"計算結果: {result}"
except:
return "計算エラー"
tools = [
Tool(
name="DatabaseSearch",
func=search_database,
description="データベースから情報を検索します"
),
Tool(
name="Calculator",
func=calculate,
description="数式を計算します"
)
]
# プロンプトテンプレート
prompt = PromptTemplate.from_template("""
あなたは有能なアシスタントです。与えられたタスクを実行するために、
利用可能なツールを適切に使用してください。
利用可能なツール:
{tools}
ツールの使用形式:
Action: ツール名
Action Input: 入力値
タスク: {input}
思考過程: {agent_scratchpad}
""")
# エージェントの作成
agent = create_react_agent(
llm=llm,
tools=tools,
prompt=prompt
)
# エージェントの実行
from langchain.agents import AgentExecutor
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=5
)
# 実行例
result = agent_executor.invoke({
"input": "データベースから2024年の売上データを検索し、前年比を計算してください"
})
print(result["output"])
from typing import List, Dict, Any
from pydantic import BaseModel, Field
from langchain.tools import StructuredTool
from langchain_openai import ChatOpenAI
import json
import requests
# 構造化された入力の定義
class EmailParams(BaseModel):
recipient: str = Field(description="メールの宛先")
subject: str = Field(description="メールの件名")
body: str = Field(description="メール本文")
attachments: List[str] = Field(default=[], description="添付ファイルのパス")
class DatabaseQuery(BaseModel):
table: str = Field(description="検索対象のテーブル名")
filters: Dict[str, Any] = Field(description="検索条件")
limit: int = Field(default=10, description="取得件数")
# ツール関数の実装
async def send_email(params: EmailParams) -> str:
"""メールを送信する"""
# 実際のメール送信ロジック
try:
# SendGrid, AWS SES等のAPIを呼び出す
response = requests.post(
"https://api.sendgrid.com/v3/mail/send",
headers={
"Authorization": f"Bearer {os.getenv('SENDGRID_API_KEY')}",
"Content-Type": "application/json"
},
json={
"personalizations": [{
"to": [{"email": params.recipient}]
}],
"from": {"email": "agent@example.com"},
"subject": params.subject,
"content": [{
"type": "text/plain",
"value": params.body
}]
}
)
return f"メール送信成功: {params.recipient}"
except Exception as e:
return f"メール送信失敗: {str(e)}"
async def query_database(params: DatabaseQuery) -> str:
"""データベースをクエリする"""
# 実際のDB接続とクエリ実行
import sqlite3
conn = sqlite3.connect('company.db')
cursor = conn.cursor()
# 動的クエリ構築(実際はSQLインジェクション対策が必要)
where_clause = " AND ".join([f"{k} = ?" for k in params.filters.keys()])
query = f"SELECT * FROM {params.table}"
if where_clause:
query += f" WHERE {where_clause}"
query += f" LIMIT {params.limit}"
cursor.execute(query, list(params.filters.values()))
results = cursor.fetchall()
conn.close()
return json.dumps(results, ensure_ascii=False)
# 構造化ツールの作成
email_tool = StructuredTool.from_function(
func=send_email,
name="send_email",
description="メールを送信します",
args_schema=EmailParams,
coroutine=send_email
)
db_tool = StructuredTool.from_function(
func=query_database,
name="query_database",
description="データベースから情報を取得します",
args_schema=DatabaseQuery,
coroutine=query_database
)
# Function Calling対応のLLM
llm = ChatOpenAI(
model="gpt-4-1106-preview",
temperature=0
).bind_tools([email_tool, db_tool])
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Sequence
from operator import add
import asyncio
# 状態の定義
class AgentState(TypedDict):
messages: Annotated[Sequence[str], add]
task: str
subtasks: List[str]
results: Dict[str, Any]
current_agent: str
completed: bool
# 専門エージェントの定義
class ResearchAgent:
"""調査を担当するエージェント"""
def __init__(self, llm):
self.llm = llm
self.tools = [
Tool(name="web_search", func=self.web_search, description="Web検索"),
Tool(name="arxiv_search", func=self.arxiv_search, description="論文検索")
]
async def web_search(self, query: str) -> str:
# 実際のWeb検索API呼び出し
return f"Web検索結果: {query}"
async def arxiv_search(self, query: str) -> str:
# arXiv API呼び出し
return f"論文検索結果: {query}"
async def process(self, state: AgentState) -> AgentState:
"""調査タスクを実行"""
task = state["task"]
# LLMにツール使用を判断させる
response = await self.llm.ainvoke(
f"タスク: {task}\n利用可能なツール: {self.tools}\n適切なツールを使って調査してください。"
)
# 結果を状態に追加
state["results"]["research"] = response
state["messages"].append(f"調査完了: {response}")
return state
class AnalysisAgent:
"""分析を担当するエージェント"""
def __init__(self, llm):
self.llm = llm
async def process(self, state: AgentState) -> AgentState:
"""調査結果を分析"""
research_results = state["results"].get("research", "")
analysis = await self.llm.ainvoke(
f"以下の調査結果を分析してください:\n{research_results}"
)
state["results"]["analysis"] = analysis
state["messages"].append(f"分析完了: {analysis}")
return state
class ReportAgent:
"""レポート作成を担当するエージェント"""
def __init__(self, llm):
self.llm = llm
async def process(self, state: AgentState) -> AgentState:
"""最終レポートを作成"""
results = state["results"]
report = await self.llm.ainvoke(
f"""以下の情報を基に包括的なレポートを作成してください:
調査結果: {results.get('research', '')}
分析結果: {results.get('analysis', '')}
"""
)
state["results"]["report"] = report
state["messages"].append(f"レポート作成完了: {report}")
state["completed"] = True
return state
# マルチエージェントワークフローの構築
class MultiAgentSystem:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4-turbo")
self.research_agent = ResearchAgent(self.llm)
self.analysis_agent = AnalysisAgent(self.llm)
self.report_agent = ReportAgent(self.llm)
# ワークフローグラフの構築
self.workflow = StateGraph(AgentState)
# ノードの追加
self.workflow.add_node("research", self.research_agent.process)
self.workflow.add_node("analysis", self.analysis_agent.process)
self.workflow.add_node("report", self.report_agent.process)
# エッジの定義
self.workflow.add_edge("research", "analysis")
self.workflow.add_edge("analysis", "report")
self.workflow.add_edge("report", END)
# エントリーポイント
self.workflow.set_entry_point("research")
# コンパイル
self.app = self.workflow.compile()
async def run(self, task: str) -> Dict[str, Any]:
"""マルチエージェントシステムを実行"""
initial_state = {
"messages": [f"タスク開始: {task}"],
"task": task,
"subtasks": [],
"results": {},
"current_agent": "research",
"completed": False
}
final_state = await self.app.ainvoke(initial_state)
return final_state["results"]["report"]
# 使用例
async def main():
system = MultiAgentSystem()
report = await system.run(
"AI技術の最新トレンドについて調査し、ビジネスへの影響を分析してレポートを作成してください"
)
print(report)
# 実行
asyncio.run(main())
from langchain.memory import ConversationSummaryBufferMemory
from langchain.schema import BaseMemory
from typing import Dict, Any, List
import redis
import json
from datetime import datetime
class HybridMemory(BaseMemory):
"""短期記憶と長期記憶を組み合わせたハイブリッドメモリ"""
def __init__(
self,
redis_url: str,
session_id: str,
llm: Any,
max_short_term_size: int = 10,
summary_threshold: int = 5
):
self.redis_client = redis.from_url(redis_url)
self.session_id = session_id
self.llm = llm
self.max_short_term_size = max_short_term_size
self.summary_threshold = summary_threshold
# 短期記憶(最近の会話)
self.short_term = []
# 長期記憶(要約)
self.long_term_key = f"memory:long_term:{session_id}"
# セッション情報
self.session_key = f"session:{session_id}"
@property
def memory_variables(self) -> List[str]:
return ["history", "context"]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""メモリから情報を読み込む"""
# 長期記憶を取得
long_term = self.redis_client.get(self.long_term_key)
long_term_summary = long_term.decode() if long_term else ""
# 短期記憶を文字列化
short_term_text = "\n".join(self.short_term)
# コンテキスト情報を取得
context = self._get_context_info()
return {
"history": f"長期記憶:\n{long_term_summary}\n\n最近の会話:\n{short_term_text}",
"context": context
}
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""会話コンテキストを保存"""
# 短期記憶に追加
timestamp = datetime.now().isoformat()
conversation = f"[{timestamp}] Human: {inputs.get('input', '')}\nAI: {outputs.get('output', '')}"
self.short_term.append(conversation)
# 短期記憶のサイズチェック
if len(self.short_term) > self.max_short_term_size:
self._compress_to_long_term()
# セッション情報を更新
self._update_session_info(inputs, outputs)
def _compress_to_long_term(self):
"""短期記憶を要約して長期記憶に移動"""
if len(self.short_term) < self.summary_threshold:
return
# 要約対象の会話を取得
to_summarize = self.short_term[:self.summary_threshold]
conversations = "\n".join(to_summarize)
# LLMで要約
summary_prompt = f"""以下の会話を要約してください。重要な情報は保持してください:
{conversations}
要約:"""
summary = self.llm.predict(summary_prompt)
# 既存の長期記憶と結合
existing_long_term = self.redis_client.get(self.long_term_key)
if existing_long_term:
combined = f"{existing_long_term.decode()}\n\n{summary}"
else:
combined = summary
# 保存
self.redis_client.set(self.long_term_key, combined)
# 短期記憶から削除
self.short_term = self.short_term[self.summary_threshold:]
def _get_context_info(self) -> str:
"""セッションのコンテキスト情報を取得"""
session_data = self.redis_client.hgetall(self.session_key)
if not session_data:
return ""
context = []
for key, value in session_data.items():
context.append(f"{key.decode()}: {value.decode()}")
return "\n".join(context)
def _update_session_info(self, inputs: Dict[str, Any], outputs: Dict[str, str]):
"""セッション情報を更新"""
self.redis_client.hset(
self.session_key,
mapping={
"last_interaction": datetime.now().isoformat(),
"interaction_count": self.redis_client.hincrby(self.session_key, "interaction_count", 1)
}
)
def clear(self) -> None:
"""メモリをクリア"""
self.short_term = []
self.redis_client.delete(self.long_term_key)
self.redis_client.delete(self.session_key)
# 使用例
hybrid_memory = HybridMemory(
redis_url="redis://localhost:6379",
session_id="user_123",
llm=ChatOpenAI(),
max_short_term_size=20,
summary_threshold=10
)
# エージェントに統合
agent_with_memory = create_react_agent(
llm=llm,
tools=tools,
prompt=prompt,
memory=hybrid_memory
)
最適化手法 | 削減効果 | 実装難易度 | 適用場面 |
---|---|---|---|
プロンプトキャッシング | 30-50% | 低 | 繰り返しクエリ |
モデル階層化 | 40-60% | 中 | 複雑度に応じた処理 |
バッチ処理 | 20-30% | 低 | 大量処理 |
ストリーミング | UX改善 | 中 | リアルタイム応答 |
セマンティックキャッシュ | 50-70% | 高 | 類似クエリ |
from functools import lru_cache
import hashlib
from typing import Optional
import numpy as np
from sentence_transformers import SentenceTransformer
class SemanticCache:
"""セマンティック類似性に基づくキャッシュシステム"""
def __init__(self, similarity_threshold: float = 0.85):
self.cache = {}
self.embeddings = {}
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.similarity_threshold = similarity_threshold
def _get_embedding(self, text: str) -> np.ndarray:
"""テキストの埋め込みを取得"""
return self.model.encode(text)
def _calculate_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
"""コサイン類似度を計算"""
return np.dot(embedding1, embedding2) / (np.linalg.norm(embedding1) * np.linalg.norm(embedding2))
def get(self, query: str) -> Optional[str]:
"""キャッシュから類似クエリの結果を取得"""
query_embedding = self._get_embedding(query)
for cached_query, cached_embedding in self.embeddings.items():
similarity = self._calculate_similarity(query_embedding, cached_embedding)
if similarity >= self.similarity_threshold:
print(f"キャッシュヒット! 類似度: {similarity:.2f}")
return self.cache[cached_query]
return None
def set(self, query: str, result: str):
"""クエリと結果をキャッシュに保存"""
embedding = self._get_embedding(query)
self.embeddings[query] = embedding
self.cache[query] = result
# モデル階層化システム
class ModelHierarchy:
"""複雑度に応じて適切なモデルを選択"""
def __init__(self):
self.models = {
"simple": ChatOpenAI(model="gpt-3.5-turbo", temperature=0),
"medium": ChatOpenAI(model="gpt-4", temperature=0),
"complex": ChatOpenAI(model="gpt-4-turbo", temperature=0)
}
self.classifier = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
def classify_complexity(self, task: str) -> str:
"""タスクの複雑度を分類"""
prompt = f"""以下のタスクの複雑度を評価してください:
タスク: {task}
分類:
- simple: 基本的な質問応答、簡単な計算
- medium: 複数ステップの推論、中程度の分析
- complex: 高度な分析、創造的タスク、専門知識が必要
回答は'simple'、'medium'、'complex'のいずれかで:"""
response = self.classifier.predict(prompt).strip().lower()
return response if response in self.models else "medium"
def process(self, task: str) -> str:
"""タスクを適切なモデルで処理"""
complexity = self.classify_complexity(task)
model = self.models[complexity]
print(f"タスク複雑度: {complexity} - {model.model_name}を使用")
return model.predict(task)
# 最適化されたエージェント
class OptimizedAgent:
def __init__(self):
self.semantic_cache = SemanticCache()
self.model_hierarchy = ModelHierarchy()
self.batch_queue = []
self.batch_size = 5
async def process_request(self, request: str) -> str:
"""最適化された処理"""
# 1. キャッシュチェック
cached_result = self.semantic_cache.get(request)
if cached_result:
return cached_result
# 2. モデル選択と処理
result = self.model_hierarchy.process(request)
# 3. キャッシュ保存
self.semantic_cache.set(request, result)
return result
async def process_batch(self, requests: List[str]) -> List[str]:
"""バッチ処理で効率化"""
# 類似リクエストをグループ化
groups = self._group_similar_requests(requests)
results = []
for group in groups:
# グループごとに最適なモデルで処理
complexity = self.model_hierarchy.classify_complexity(group[0])
model = self.model_hierarchy.models[complexity]
# バッチ処理
batch_results = await model.abatch(group)
results.extend(batch_results)
return results
def _group_similar_requests(self, requests: List[str]) -> List[List[str]]:
"""類似リクエストをグループ化"""
# 実装は省略(埋め込みベースのクラスタリング)
return [requests] # 簡略化
from typing import Dict, List, Optional
from datetime import datetime
import asyncio
from enum import Enum
class TicketPriority(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class CustomerSupportAgent:
"""高度なカスタマーサポートエージェント"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4-turbo")
self.memory = HybridMemory(
redis_url="redis://localhost:6379",
session_id="support_agent",
llm=self.llm
)
self.tools = self._setup_tools()
self.workflow = self._setup_workflow()
def _setup_tools(self) -> List[Tool]:
"""ツールのセットアップ"""
return [
StructuredTool.from_function(
func=self.search_knowledge_base,
name="search_kb",
description="社内ナレッジベースを検索"
),
StructuredTool.from_function(
func=self.check_order_status,
name="check_order",
description="注文状況を確認"
),
StructuredTool.from_function(
func=self.create_ticket,
name="create_ticket",
description="サポートチケットを作成"
),
StructuredTool.from_function(
func=self.escalate_to_human,
name="escalate",
description="人間のオペレーターにエスカレーション"
)
]
def _setup_workflow(self) -> StateGraph:
"""ワークフローの構築"""
workflow = StateGraph(Dict)
# ノード定義
workflow.add_node("classify", self.classify_inquiry)
workflow.add_node("search", self.search_solution)
workflow.add_node("respond", self.generate_response)
workflow.add_node("followup", self.check_satisfaction)
# 条件付きエッジ
workflow.add_conditional_edges(
"classify",
self.route_based_on_classification,
{
"simple": "search",
"complex": "escalate",
"order": "check_order"
}
)
return workflow.compile()
async def classify_inquiry(self, state: Dict) -> Dict:
"""問い合わせを分類"""
inquiry = state["inquiry"]
classification_prompt = f"""
以下の問い合わせを分類してください:
問い合わせ: {inquiry}
カテゴリ:
- simple: FAQ、一般的な質問
- complex: 技術的問題、複雑なケース
- order: 注文関連
- complaint: クレーム
優先度も判定してください(low/medium/high/urgent)
JSON形式で回答:
"""
response = await self.llm.ainvoke(classification_prompt)
classification = json.loads(response.content)
state["classification"] = classification["category"]
state["priority"] = classification["priority"]
return state
async def search_solution(self, state: Dict) -> Dict:
"""解決策を検索"""
inquiry = state["inquiry"]
# ナレッジベース検索
kb_results = await self.search_knowledge_base(inquiry)
# 類似の過去事例を検索
similar_cases = await self.find_similar_cases(inquiry)
state["kb_results"] = kb_results
state["similar_cases"] = similar_cases
return state
async def generate_response(self, state: Dict) -> Dict:
"""応答を生成"""
context = {
"inquiry": state["inquiry"],
"kb_results": state.get("kb_results", ""),
"similar_cases": state.get("similar_cases", ""),
"customer_history": state.get("customer_history", "")
}
response_prompt = f"""
以下の情報を基に、顧客への丁寧で有益な応答を生成してください:
問い合わせ: {context['inquiry']}
ナレッジベース: {context['kb_results']}
類似事例: {context['similar_cases']}
応答:
"""
response = await self.llm.ainvoke(response_prompt)
state["response"] = response.content
return state
async def check_satisfaction(self, state: Dict) -> Dict:
"""顧客満足度をチェック"""
# 実際は顧客からのフィードバックを待つ
state["satisfaction_check"] = True
return state
def route_based_on_classification(self, state: Dict) -> str:
"""分類に基づいてルーティング"""
classification = state["classification"]
priority = state["priority"]
if priority == "urgent" or classification == "complaint":
return "escalate"
elif classification == "order":
return "check_order"
else:
return "search"
async def search_knowledge_base(self, query: str) -> str:
"""ナレッジベースを検索(ベクトルDB使用)"""
# ChromaDBやPineconeを使用した実装
from chromadb import Client
client = Client()
collection = client.get_collection("support_kb")
results = collection.query(
query_texts=[query],
n_results=3
)
return "\n".join(results["documents"][0])
async def check_order_status(self, order_id: str) -> Dict:
"""注文状況を確認"""
# 実際のAPI呼び出し
return {
"order_id": order_id,
"status": "配送中",
"estimated_delivery": "2025-06-22",
"tracking_number": "JP123456789"
}
async def create_ticket(self, **kwargs) -> str:
"""サポートチケットを作成"""
ticket = {
"id": f"TICKET-{datetime.now().strftime('%Y%m%d%H%M%S')}",
"created_at": datetime.now().isoformat(),
**kwargs
}
# データベースに保存
# ...
return ticket["id"]
async def escalate_to_human(self, state: Dict) -> Dict:
"""人間のオペレーターにエスカレーション"""
ticket_id = await self.create_ticket(
inquiry=state["inquiry"],
priority=state["priority"],
classification=state["classification"],
ai_attempted=True
)
state["escalated"] = True
state["ticket_id"] = ticket_id
state["response"] = f"""申し訳ございません。この問題は専門スタッフが対応させていただきます。
チケット番号: {ticket_id}
優先度: {state['priority']}
まもなく担当者からご連絡させていただきます。"""
return state
async def handle_inquiry(self, inquiry: str, customer_id: Optional[str] = None) -> str:
"""問い合わせを処理"""
initial_state = {
"inquiry": inquiry,
"customer_id": customer_id,
"timestamp": datetime.now().isoformat()
}
# 顧客履歴を取得
if customer_id:
initial_state["customer_history"] = await self.get_customer_history(customer_id)
# ワークフロー実行
final_state = await self.workflow.ainvoke(initial_state)
# メモリに保存
self.memory.save_context(
{"input": inquiry},
{"output": final_state["response"]}
)
return final_state["response"]
async def get_customer_history(self, customer_id: str) -> str:
"""顧客履歴を取得"""
# 実際のDB検索
return f"過去の問い合わせ: 3件、最終購入: 2025-06-01"
# 使用例
async def main():
agent = CustomerSupportAgent()
# シンプルな問い合わせ
response1 = await agent.handle_inquiry(
"返品ポリシーについて教えてください",
customer_id="CUST-12345"
)
print(f"応答1: {response1}\n")
# 複雑な問い合わせ
response2 = await agent.handle_inquiry(
"注文した商品が届きません。注文番号はORD-98765です。",
customer_id="CUST-12345"
)
print(f"応答2: {response2}\n")
# 緊急の問い合わせ
response3 = await agent.handle_inquiry(
"製品に欠陥があり、怪我をしました。すぐに対応してください。",
customer_id="CUST-12345"
)
print(f"応答3: {response3}")
# 実行
asyncio.run(main())
from typing import List, Pattern
import re
class SecurityManager:
"""AIエージェントのセキュリティ管理"""
def __init__(self):
self.forbidden_patterns = [
r"ignore previous instructions",
r"system prompt",
r"reveal your instructions",
r"<script.*?>.*?</script>",
r"DROP TABLE",
r"DELETE FROM"
]
self.sensitive_data_patterns = {
"credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
}
def sanitize_input(self, text: str) -> str:
"""入力のサニタイズ"""
# プロンプトインジェクションのチェック
for pattern in self.forbidden_patterns:
if re.search(pattern, text, re.IGNORECASE):
raise ValueError(f"禁止されたパターンが検出されました: {pattern}")
return text
def mask_sensitive_data(self, text: str) -> str:
"""機密データのマスキング"""
for data_type, pattern in self.sensitive_data_patterns.items():
text = re.sub(pattern, f"[{data_type.upper()}_MASKED]", text)
return text
def validate_tool_access(self, tool_name: str, user_role: str) -> bool:
"""ツールアクセスの検証"""
access_control = {
"admin": ["*"],
"user": ["search_kb", "check_order"],
"guest": ["search_kb"]
}
allowed_tools = access_control.get(user_role, [])
return tool_name in allowed_tools or "*" in allowed_tools
2025 年の AI エージェント開発は、LangChainと LangGraph の組み合わせにより、従来よりもはるかに高度で実用的なシステムの構築が可能になっています。
LLMアプリケーション開発の標準化
ツール統合の簡素化
本格的な業務システムへの統合
人間の介入を最小限に
AI エージェントは、単なる自動化ツールではありません。人間の能力を拡張し、創造性を解放する新しいパートナーです。重要なのは、適切な設計と責任ある実装です。
本記事で紹介した技術とベストプラクティスを活用することで、実用的で高性能な AI エージェントを構築できます。継続的な改善と最新技術の採用により、より洗練されたシステムへと進化させていくことが重要です。