全RAG技術実装集2025 - シンプルで実践的なアプローチ
RAG(Retrieval-Augmented Generation)の基本から実践的な実装まで、2025年最新のベストプラクティスを徹底解説。プロトタイプから本番環境への移行方法も紹介します。
AI Engineering Hubは70以上のLLM・RAG・AIエージェント実装チュートリアルを含む包括的なリポジトリ。初心者から上級者まで、ローカルチャットボット、マルチエージェントシステム、OCR・RAG実装まで実践的なAI開発をJupyter Notebookで学習できる最高の学習リソース。
AI Engineering Hub は、LLM、RAG、AI エージェントの実装に関する包括的なチュートリアルを提供する、現在最も充実した AI 学習リソースの 1 つです。理論から実践まで、Jupyter Notebook で学べる実例が豊富に用意され、エンタープライズレベルの AI 実装パターン、ベストプラクティス、最新技術トレンドを体系的に学習できます。
AI Engineering Hub は、初心者から実務者、研究者までを対象とした、AI エンジニアリングの総合学習リソースです:
項目 | 数値 | 説明 |
---|---|---|
スター数 | 11,400+ | コミュニティからの高い評価 |
フォーク数 | 1,900+ | アクティブな開発コミュニティ |
コミット数 | 284+ | 継続的なアップデート |
プロジェクト数 | 70+ | 幅広いトピックをカバー |
主要言語 | Python 2.5% | Jupyter Notebookが主体(97.5%) |
AI Engineering Hub は以下の主要領域をカバーしています:
LLM実装と活用
RAG(検索拡張生成)システム
マルチエージェントシステム
特殊アプリケーション
LLM基礎、API使用、シンプルなチャットボット作成
ベクターデータベース、文書処理、構造化データ連携
複数AIの統合、オーケストレーション、本番運用
スケーラブルアーキテクチャ、セキュリティ、監視システム
# AI Engineering Hubリポジトリのクローン
git clone https://github.com/patchy631/ai-engineering-hub.git
cd ai-engineering-hub
# Python仮想環境の作成(推奨)
python -m venv ai-engineering-env
source ai-engineering-env/bin/activate # Linux/Mac
# ai-engineering-env\Scripts\activate # Windows
# 基本的な依存パッケージのインストール
pip install notebook pandas numpy matplotlib seaborn
pip install openai langchain chromadb
pip install sentence-transformers transformers torch
# Jupyter Notebookの起動
jupyter notebook
# またはJupyter Labを使用する場合
pip install jupyterlab
jupyter lab
ブラウザで http://localhost:8888
にアクセスし、チュートリアルを開始できます。
初心者向けの最初のプロジェクト例:
# basic_chatbot.ipynb
import openai
import os
from datetime import datetime
# OpenAI APIキーの設定
openai.api_key = os.getenv('OPENAI_API_KEY')
class SimpleChatbot:
def __init__(self, model="gpt-3.5-turbo"):
self.model = model
self.conversation_history = []
def add_system_message(self, content):
"""AIの振る舞いを設定"""
self.conversation_history.append({
"role": "system",
"content": content
})
def chat(self, user_message):
"""AIと会話する"""
# ユーザーメッセージを履歴に追加
self.conversation_history.append({
"role": "user",
"content": user_message
})
try:
# OpenAI APIへのリクエスト
response = openai.ChatCompletion.create(
model=self.model,
messages=self.conversation_history,
max_tokens=500,
temperature=0.7
)
# AIの応答を取得
ai_message = response.choices[0].message.content
# 履歴にAIの応答を追加
self.conversation_history.append({
"role": "assistant",
"content": ai_message
})
return ai_message
except Exception as e:
return f"エラーが発生しました: {str(e)}"
def display_conversation(self):
"""conversation履歴を表示"""
for message in self.conversation_history:
role = message['role']
content = message['content']
if role == 'user':
print(f"💬 ユーザー: {content}")
elif role == 'assistant':
print(f"🤖 AI: {content}")
elif role == 'system':
print(f"⚙️ システム: {content}")
print("-" * 50)
# チャットボットの初期化と使用例
bot = SimpleChatbot()
# AIの振る舞いを設定
bot.add_system_message("""
あなたはAIエンジニアリングのエキスパートです。
初心者にも分かりやすく、実践的なアドバイスを提供してください。
日本語で応答し、具体例を交えながら説明してください。
""")
# 対話の実行
user_input = "LLMとRAGの違いを教えてください"
response = bot.chat(user_input)
print(f"ユーザー: {user_input}")
print(f"AI: {response}")
# シンプルなプロンプト
prompt = "この文書を要約してください"
# 構造化されたプロンプトテンプレート
prompt_template = """
# 役割設定
あなたは優秀な文書アナリストです。
# タスク
以下の文書を分析し、指定された形式で要約してください。
# 出力形式
- 主要ポイント: 3-5項目
- 重要度: 高/中/低
- アクションアイテム: 必要に応じて
# 文書
{document_text}
# 要約結果
"""
# シンプルなプロンプト
prompt = "この文書を要約してください"
# 構造化されたプロンプトテンプレート
prompt_template = """
# 役割設定
あなたは優秀な文書アナリストです。
# タスク
以下の文書を分析し、指定された形式で要約してください。
# 出力形式
- 主要ポイント: 3-5項目
- 重要度: 高/中/低
- アクションアイテム: 必要に応じて
# 文書
{document_text}
# 要約結果
"""
# basic_rag_system.ipynb
import os
import numpy as np
from typing import List, Dict
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
class BasicRAGSystem:
def __init__(self, collection_name="documents"):
# エンベディングモデルの初期化
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
# ChromaDBクライアントの初期化
self.chroma_client = chromadb.Client(Settings(
persist_directory="./chroma_db"
))
# コレクションの作成または取得
try:
self.collection = self.chroma_client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
except:
self.collection = self.chroma_client.get_collection(collection_name)
def add_documents(self, documents: List[str], document_ids: List[str] = None):
"""documentsをベクターデータベースに追加"""
if document_ids is None:
document_ids = [f"doc_{i}" for i in range(len(documents))]
# 文書をエンベディング
embeddings = self.embedding_model.encode(documents).tolist()
# ChromaDBにデータを追加
self.collection.add(
embeddings=embeddings,
documents=documents,
ids=document_ids
)
print(f"{len(documents)}件の文書を追加しました")
def search_similar_documents(self, query: str, n_results: int = 3) -> List[Dict]:
"""queryに似た文書を検索"""
# クエリをエンベディング
query_embedding = self.embedding_model.encode([query]).tolist()
# 似た文書を検索
results = self.collection.query(
query_embeddings=query_embedding,
n_results=n_results
)
# 結果を整形
formatted_results = []
for i in range(len(results['ids'][0])):
formatted_results.append({
'id': results['ids'][0][i],
'document': results['documents'][0][i],
'distance': results['distances'][0][i]
})
return formatted_results
def generate_rag_response(self, query: str, n_docs: int = 3) -> str:
"""RAGパイプラインで回答を生成"""
# 関連文書を検索
similar_docs = self.search_similar_documents(query, n_docs)
# コンテキストを構築
context = "\n\n".join([
f"文書{i+1}: {doc['document']}"
for i, doc in enumerate(similar_docs)
])
# RAGプロンプトを作成
rag_prompt = f"""
以下のコンテキスト情報を参考にして、ユーザーの質問に答えてください。
コンテキスト:
{context}
質問: {query}
回答:
"""
# ここでLLM APIを呼び出して回答を生成
# (実際の実装ではOpenAI APIなどを使用)
return {
'prompt': rag_prompt,
'context_docs': similar_docs,
'answer': 'ここにLLMからの回答が入ります'
}
# RAGシステムの使用例
rag = BasicRAGSystem()
# サンプル文書を追加
sample_documents = [
"AIエンジニアリングは機械学習モデルを実用的なシステムに統合する分野です。",
"LLM(Large Language Model)は大量のテキストデータで訓練された言語モデルです。",
"RAG(Retrieval-Augmented Generation)は検索と生成を組み合わせた手法です。",
"ベクターデータベースは高次元ベクターを効率的に保存・検索するシステムです。"
]
rag.add_documents(sample_documents)
# 検索テスト
query = "RAGについて教えてください"
results = rag.search_similar_documents(query)
print("検索結果:")
for result in results:
print(f"ID: {result['id']}, スコア: {result['distance']:.3f}")
print(f"文書: {result['document']}")
print("-" * 50)
# multi_agent_system.ipynb
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
import json
import uuid
from datetime import datetime
class AgentRole(Enum):
COORDINATOR = "coordinator" # 全体の調整と指示
RESEARCHER = "researcher" # 情報収集と分析
ANALYST = "analyst" # データ分析と洞察
WRITER = "writer" # コンテンツ作成と整形
REVIEWER = "reviewer" # 品質チェックと改善提案
@dataclass
class Message:
id: str
sender_id: str
receiver_id: str
content: str
message_type: str
timestamp: datetime
metadata: Dict[str, Any] = None
class BaseAgent(ABC):
def __init__(self, agent_id: str, role: AgentRole, name: str):
self.agent_id = agent_id
self.role = role
self.name = name
self.memory = [] # メッセージ履歴
self.status = "idle" # idle, working, waiting
@abstractmethod
def process_message(self, message: Message) -> List[Message]:
"""messageを処理し、必要に応じて応答メッセージを返す"""
pass
def add_to_memory(self, message: Message):
"""memoryにmessageを追加"""
self.memory.append(message)
# memoryのサイズ制限を実装する場合
if len(self.memory) > 100:
self.memory = self.memory[-50:] # 最新50件を保持
class ResearchAgent(BaseAgent):
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentRole.RESEARCHER, "Research Specialist")
self.expertise = ["web_search", "data_collection", "fact_checking"]
def process_message(self, message: Message) -> List[Message]:
self.add_to_memory(message)
if "research" in message.content.lower():
# 研究タスクを実行
research_result = self._conduct_research(message.content)
response = Message(
id=str(uuid.uuid4()),
sender_id=self.agent_id,
receiver_id=message.sender_id,
content=research_result,
message_type="research_report",
timestamp=datetime.now(),
metadata={"research_query": message.content}
)
return [response]
return []
def _conduct_research(self, query: str) -> str:
# 実際の実装では、ここでWeb検索やAPI呼び出しを行う
return f"「{query}」に関する研究結果をここに返します。"
class AnalystAgent(BaseAgent):
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentRole.ANALYST, "Data Analyst")
self.expertise = ["data_analysis", "pattern_recognition", "insights"]
def process_message(self, message: Message) -> List[Message]:
self.add_to_memory(message)
if message.message_type == "research_report":
# 研究結果を分析
analysis_result = self._analyze_data(message.content)
response = Message(
id=str(uuid.uuid4()),
sender_id=self.agent_id,
receiver_id="coordinator", # コーディネーターに送信
content=analysis_result,
message_type="analysis_report",
timestamp=datetime.now(),
metadata={"analyzed_data": message.id}
)
return [response]
return []
def _analyze_data(self, data: str) -> str:
# 実際の実装では、データ分析、パターン認識等を行う
return f"データ分析結果: {data[:100]}...の主要なパターンと洞察"
class MultiAgentSystem:
def __init__(self):
self.agents: Dict[str, BaseAgent] = {}
self.message_queue: List[Message] = []
self.coordinator_id = "coordinator"
def add_agent(self, agent: BaseAgent):
"""systemにAgentを追加"""
self.agents[agent.agent_id] = agent
def send_message(self, sender_id: str, receiver_id: str, content: str, message_type: str = "general"):
"""agent間でmessageを送信"""
message = Message(
id=str(uuid.uuid4()),
sender_id=sender_id,
receiver_id=receiver_id,
content=content,
message_type=message_type,
timestamp=datetime.now()
)
self.message_queue.append(message)
return message.id
def process_messages(self):
"""queue内の全messageを処理"""
while self.message_queue:
message = self.message_queue.pop(0)
if message.receiver_id in self.agents:
receiver = self.agents[message.receiver_id]
responses = receiver.process_message(message)
# 応答messageをqueueに追加
for response in responses:
self.message_queue.append(response)
def get_agent_status(self) -> Dict[str, str]:
"""All Agentのステータスを取得"""
return {agent_id: agent.status for agent_id, agent in self.agents.items()}
# Multi-Agentシステムの使用例
system = MultiAgentSystem()
# Agentを作成してシステムに追加
researcher = ResearchAgent("researcher_01")
analyst = AnalystAgent("analyst_01")
system.add_agent(researcher)
system.add_agent(analyst)
# タスクを開始
system.send_message(
sender_id="user",
receiver_id="researcher_01",
content="AIエンジニアリングの最新トレンドを研究してください",
message_type="research_request"
)
# messageを処理
system.process_messages()
# Agentのステータスを確認
print("エージェントステータス:")
for agent_id, status in system.get_agent_status().items():
print(f"{agent_id}: {status}")
# scalable_ai_architecture.py
from typing import Dict, List, Optional
import asyncio
import aiohttp
from dataclasses import dataclass
from enum import Enum
class DeploymentStage(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
ENTERPRISE = "enterprise"
@dataclass
class AIServiceConfig:
service_name: str
model_endpoint: str
max_requests_per_minute: int
timeout_seconds: int
retry_attempts: int
fallback_model: Optional[str] = None
class ScalableAISystem:
def __init__(self, stage: DeploymentStage):
self.stage = stage
self.services = self._configure_services()
self.load_balancer = AILoadBalancer()
self.monitor = AIMonitor()
def _configure_services(self) -> Dict[str, AIServiceConfig]:
"""deployment stageに応じてserviceを設定"""
base_configs = {
DeploymentStage.DEVELOPMENT: {
"llm_service": AIServiceConfig(
service_name="local_llm",
model_endpoint="http://localhost:8000",
max_requests_per_minute=60,
timeout_seconds=30,
retry_attempts=2
)
},
DeploymentStage.PRODUCTION: {
"llm_service": AIServiceConfig(
service_name="production_llm",
model_endpoint="https://api.openai.com/v1",
max_requests_per_minute=1000,
timeout_seconds=15,
retry_attempts=3,
fallback_model="gpt-3.5-turbo"
)
},
DeploymentStage.ENTERPRISE: {
"llm_service": AIServiceConfig(
service_name="enterprise_llm",
model_endpoint="https://enterprise-api.company.com",
max_requests_per_minute=10000,
timeout_seconds=10,
retry_attempts=5,
fallback_model="claude-3-sonnet"
)
}
}
return base_configs.get(self.stage, base_configs[DeploymentStage.DEVELOPMENT])
class AILoadBalancer:
def __init__(self):
self.request_counts: Dict[str, int] = {}
self.health_status: Dict[str, bool] = {}
async def route_request(self, service_name: str, request_data: Dict) -> Dict:
"""requestを最適なserviceにルーティング"""
# ロードバランシングロジックを実装
return await self._send_request(service_name, request_data)
async def _send_request(self, service_name: str, request_data: Dict) -> Dict:
# 非同期HTTP requestの実装
return {"status": "success", "data": "mocked_response"}
class AIMonitor:
def __init__(self):
self.metrics: Dict[str, List[float]] = {}
self.alerts: List[str] = []
def log_request(self, service_name: str, response_time: float, status: str):
"""requestのmetricsを記録"""
if service_name not in self.metrics:
self.metrics[service_name] = []
self.metrics[service_name].append(response_time)
# アラートのチェック
if response_time > 5.0: # 5秒以上の応答時間
self.alerts.append(f"{service_name}: Slow response ({response_time:.2f}s)")
def get_performance_report(self) -> Dict[str, Dict[str, float]]:
"""performanceレポートを生成"""
report = {}
for service, times in self.metrics.items():
if times:
report[service] = {
"avg_response_time": sum(times) / len(times),
"max_response_time": max(times),
"min_response_time": min(times),
"total_requests": len(times)
}
return report
# enterprise_ai_security.py
import hashlib
import hmac
import secrets
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
from dataclasses import dataclass
@dataclass
class SecurityPolicy:
encryption_required: bool = True
audit_logging: bool = True
data_retention_days: int = 90
pii_detection: bool = True
access_control: bool = True
class EnterpriseAISecurity:
def __init__(self, policy: SecurityPolicy):
self.policy = policy
self.audit_log: List[Dict] = []
self.access_tokens: Dict[str, datetime] = {}
# loggingの設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def generate_access_token(self, user_id: str, permissions: List[str]) -> str:
"""secure access tokenを生成"""
token_data = {
"user_id": user_id,
"permissions": permissions,
"issued_at": datetime.now().isoformat(),
"expires_at": (datetime.now() + timedelta(hours=24)).isoformat()
}
# tokenをハッシュ化
token_string = f"{user_id}:{secrets.token_urlsafe(32)}"
token_hash = hashlib.sha256(token_string.encode()).hexdigest()
# tokenを保存
self.access_tokens[token_hash] = datetime.now() + timedelta(hours=24)
# audit logに記録
self.log_security_event("token_generated", user_id, {"token_hash": token_hash})
return token_hash
def validate_access(self, token: str, required_permission: str) -> bool:
"""access tokenとpermissionを検証"""
if token not in self.access_tokens:
self.log_security_event("invalid_token", "unknown", {"token": token[:8] + "..."})
return False
# tokenの有効期限をチェック
if datetime.now() > self.access_tokens[token]:
self.log_security_event("expired_token", "unknown", {"token": token[:8] + "..."})
del self.access_tokens[token]
return False
return True
def detect_pii(self, text: str) -> List[str]:
"""text内の個人情報を検出"""
pii_patterns = {
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"phone": r"\d{3}-\d{4}-\d{4}",
"credit_card": r"\d{4}-\d{4}-\d{4}-\d{4}"
}
detected_pii = []
for pii_type, pattern in pii_patterns.items():
import re
if re.search(pattern, text):
detected_pii.append(pii_type)
self.log_security_event("pii_detected", "system", {"type": pii_type})
return detected_pii
def sanitize_data(self, data: str) -> str:
"""dataをサニタイズして個人情報を除去"""
sanitized = data
# emailをmask
import re
sanitized = re.sub(
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"[EMAIL_REDACTED]",
sanitized
)
# 電話番号をmask
sanitized = re.sub(
r"\d{3}-\d{4}-\d{4}",
"[PHONE_REDACTED]",
sanitized
)
return sanitized
def log_security_event(self, event_type: str, user_id: str, metadata: Dict):
"""security eventをaudit logに記録"""
event = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"user_id": user_id,
"metadata": metadata
}
self.audit_log.append(event)
self.logger.info(f"Security Event: {event_type} - User: {user_id}")
# audit logのサイズ制限
if len(self.audit_log) > 10000:
self.audit_log = self.audit_log[-5000:] # 最新5000件を保持
def get_security_report(self) -> Dict:
"""securityレポートを生成"""
event_counts = {}
for event in self.audit_log:
event_type = event["event_type"]
event_counts[event_type] = event_counts.get(event_type, 0) + 1
return {
"total_events": len(self.audit_log),
"event_types": event_counts,
"active_tokens": len(self.access_tokens),
"policy": self.policy.__dict__
}
# Enterprise Securityの使用例
security_policy = SecurityPolicy(
encryption_required=True,
audit_logging=True,
data_retention_days=365,
pii_detection=True,
access_control=True
)
security_manager = EnterpriseAISecurity(security_policy)
# access tokenの生成
token = security_manager.generate_access_token(
user_id="admin@company.com",
permissions=["read", "write", "admin"]
)
# PII検出のテスト
test_data = "ユーザーのメールアドレスはjohn.doe@company.comで、電話番号は090-1234-5678です。"
pii_found = security_manager.detect_pii(test_data)
sanitized_data = security_manager.sanitize_data(test_data)
print(f"PII検出結果: {pii_found}")
print(f"サニタイズ後: {sanitized_data}")
# securityレポートの生成
report = security_manager.get_security_report()
print(f"Security Report: {report}")
コンポーネント | 技術 | 役割 | 成果 |
---|---|---|---|
RAGシステム | ChromaDB + OpenAI | ナレッジベース検索 | 問い合わせ数90%減 |
マルチエージェント | LangChain + Custom | タスク分担と自動化 | 応答時間75%短縮 |
感情分析 | Transformers | 顧客感情理解 | 満足度スコア30%向上 |
コスト最適化 | モデル選択最適化 | 運用コスト削減 | 月額コスト60%削減 |
AI Engineering Hub は、AI エンジニアリングの学習と実践において、最も価値の高いリソースの 1 つです:
AI エンジニアリングの世界は急速に進化していますが、AI Engineering Hub を活用することで、理論から実践までを体系的に学習し、現実のビジネス課題を解決できる AI システムを構築するスキルを習得できます。