Issue #139: セキュリティ: 認証・認可システムの実装
Opened 2025/8/4 by @nyasuto Open
enhancement
Description
概要
現在オープンアクセスのMoryに、セキュアな認証・認可システムを実装
背景
- 現状: 認証なしでAPIアクセス可能
- 課題: マルチユーザー環境での運用不可
- リスク: 個人情報・記憶データの保護が不十分
セキュリティ要件
1. 認証システム
# JWT (JSON Web Token) ベース認証
POST /api/auth/login
{
"username": "user@example.com",
"password": "secure_password"
}
# Response
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
"token_type": "bearer",
"expires_in": 3600,
"refresh_token": "refresh_token_here"
}
2. ユーザー管理
# ユーザー登録
POST /api/auth/register
{
"email": "user@example.com",
"password": "secure_password",
"display_name": "山田太郎"
}
# パスワードリセット
POST /api/auth/reset-password
{
"email": "user@example.com"
}
3. 認可システム
# リソースベース認可
@require_auth
@require_ownership # 自分のメモリのみアクセス可能
async def get_memory(memory_id: str, current_user: User):
memory = await get_memory_by_id(memory_id)
if memory.user_id \!= current_user.id:
raise HTTPException(403, "Access denied")
return memory
データベース拡張
ユーザーテーブル
CREATE TABLE users (
id TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
display_name TEXT,
is_active BOOLEAN DEFAULT true,
is_verified BOOLEAN DEFAULT false,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_login DATETIME
);
-- メモリテーブルにユーザーIDを追加
ALTER TABLE memories ADD COLUMN user_id TEXT REFERENCES users(id);
CREATE INDEX idx_memories_user_id ON memories(user_id);
セッション管理
CREATE TABLE user_sessions (
id TEXT PRIMARY KEY,
user_id TEXT REFERENCES users(id),
refresh_token_hash TEXT,
device_info TEXT,
ip_address TEXT,
expires_at DATETIME,
created_at DATETIME
);
セキュリティ機能
1. パスワードセキュリティ
# bcrypt による安全なハッシュ化
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
2. レート制限
# Redis ベースレート制限
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/auth/login")
@limiter.limit("5/minute") # 1分間に5回まで
async def login(request: Request, credentials: UserLogin):
pass
3. セキュリティヘッダー
# CORS・セキュリティヘッダー設定
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# セキュリティヘッダー
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
return response
4. API Key管理
# 外部API保護
class SecureConfig:
def __init__(self):
self.openai_api_key = self._get_encrypted_secret("OPENAI_API_KEY")
def _get_encrypted_secret(self, key: str) -> str:
"""暗号化された秘密鍵の復号化"""
encrypted_value = os.getenv(f"{key}_ENCRYPTED")
return decrypt_with_key(encrypted_value, self.master_key)
MCP統合セキュリティ
クライアント認証
# MCP クライアント認証
class SecureMCPServer:
def __init__(self):
self.allowed_clients = self._load_client_certificates()
async def authenticate_client(self, client_cert: str) -> bool:
"""MCPクライアント証明書検証"""
return client_cert in self.allowed_clients
データ暗号化
# 記憶データの暗号化(オプション)
from cryptography.fernet import Fernet
class MemoryEncryption:
def __init__(self, user_key: str):
self.cipher = Fernet(user_key.encode())
def encrypt_value(self, value: str) -> str:
return self.cipher.encrypt(value.encode()).decode()
def decrypt_value(self, encrypted_value: str) -> str:
return self.cipher.decrypt(encrypted_value.encode()).decode()
監査・ログ
セキュリティイベントログ
# 監査ログ
@app.middleware("http")
async def audit_log(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
# セキュリティ関連イベントのログ
if request.url.path.startswith("/api/auth/"):
await log_security_event({
"event_type": "authentication",
"user_ip": request.client.host,
"endpoint": request.url.path,
"status_code": response.status_code,
"timestamp": datetime.utcnow()
})
return response
異常検知
# 不審な活動の検知
class SecurityMonitor:
async def detect_brute_force(self, ip: str) -> bool:
"""ブルートフォース攻撃検知"""
failed_attempts = await self.get_failed_attempts(ip)
return failed_attempts > 10
async def detect_unusual_access(self, user_id: str) -> bool:
"""異常なアクセスパターン検知"""
pass
実装優先度
Phase 1: 基本認証 (高優先度)
- JWT認証システム
- ユーザー登録・ログイン
- メモリデータの所有権管理
Phase 2: セキュリティ強化 (中優先度)
- レート制限
- セキュリティヘッダー
- 監査ログ
Phase 3: 高度なセキュリティ (低優先度)
- データ暗号化
- 異常検知
- セキュリティダッシュボード
設定例
# 認証設定
JWT_SECRET_KEY=your-super-secret-jwt-key
JWT_ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
# セキュリティ設定
ALLOWED_ORIGINS=https://yourdomain.com,https://localhost:3000
ENABLE_RATE_LIMITING=true
REDIS_URL=redis://localhost:6379
# 暗号化設定(オプション)
ENCRYPTION_ENABLED=false
MASTER_ENCRYPTION_KEY=your-master-key
期待効果
- セキュリティ: 個人データ保護の確保
- スケーラビリティ: マルチユーザー対応
- 信頼性: エンタープライズレベルの安全性
- コンプライアンス: データ保護規制への対応
Comments
コメント機能は現在実装されていません。
GitHub API の comments エンドポイントを統合する予定です。
🤖 AI分析
分類結果
🔒 セキュリティ
70%
🟡 中
80%
77 スコア
カテゴリ 50
優先度 27
0 適用されたルール
Enhanced Security Detection
70%
• Body contains keyword: "security"• Body contains keyword: "encryption"
securityencryptionauthentication
Enhanced Feature Request Detection
64%
• Has matching label: "enhancement"
Details
Assignees:
None
Milestone:
None
Created:
2025/8/4
Updated:
2025/8/4