Issue #139: セキュリティ: 認証・認可システムの実装

Opened 2025/8/4 by @nyasuto Open
enhancement

Description

概要

現在オープンアクセスのMoryに、セキュアな認証・認可システムを実装

背景

  • 現状: 認証なしでAPIアクセス可能
  • 課題: マルチユーザー環境での運用不可
  • リスク: 個人情報・記憶データの保護が不十分

セキュリティ要件

1. 認証システム

# JWT (JSON Web Token) ベース認証
POST /api/auth/login
{
  "username": "user@example.com",
  "password": "secure_password"
}

# Response
{
  "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
  "token_type": "bearer",
  "expires_in": 3600,
  "refresh_token": "refresh_token_here"
}

2. ユーザー管理

# ユーザー登録
POST /api/auth/register
{
  "email": "user@example.com",
  "password": "secure_password",
  "display_name": "山田太郎"
}

# パスワードリセット
POST /api/auth/reset-password
{
  "email": "user@example.com"
}

3. 認可システム

# リソースベース認可
@require_auth
@require_ownership  # 自分のメモリのみアクセス可能
async def get_memory(memory_id: str, current_user: User):
    memory = await get_memory_by_id(memory_id)
    if memory.user_id \!= current_user.id:
        raise HTTPException(403, "Access denied")
    return memory

データベース拡張

ユーザーテーブル

CREATE TABLE users (
    id TEXT PRIMARY KEY,
    email TEXT UNIQUE NOT NULL,
    password_hash TEXT NOT NULL,
    display_name TEXT,
    is_active BOOLEAN DEFAULT true,
    is_verified BOOLEAN DEFAULT false,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    last_login DATETIME
);

-- メモリテーブルにユーザーIDを追加
ALTER TABLE memories ADD COLUMN user_id TEXT REFERENCES users(id);
CREATE INDEX idx_memories_user_id ON memories(user_id);

セッション管理

CREATE TABLE user_sessions (
    id TEXT PRIMARY KEY,
    user_id TEXT REFERENCES users(id),
    refresh_token_hash TEXT,
    device_info TEXT,
    ip_address TEXT,
    expires_at DATETIME,
    created_at DATETIME
);

セキュリティ機能

1. パスワードセキュリティ

# bcrypt による安全なハッシュ化
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

2. レート制限

# Redis ベースレート制限
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/api/auth/login")
@limiter.limit("5/minute")  # 1分間に5回まで
async def login(request: Request, credentials: UserLogin):
    pass

3. セキュリティヘッダー

# CORS・セキュリティヘッダー設定
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
)

# セキュリティヘッダー
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    return response

4. API Key管理

# 外部API保護
class SecureConfig:
    def __init__(self):
        self.openai_api_key = self._get_encrypted_secret("OPENAI_API_KEY")
    
    def _get_encrypted_secret(self, key: str) -> str:
        """暗号化された秘密鍵の復号化"""
        encrypted_value = os.getenv(f"{key}_ENCRYPTED")
        return decrypt_with_key(encrypted_value, self.master_key)

MCP統合セキュリティ

クライアント認証

# MCP クライアント認証
class SecureMCPServer:
    def __init__(self):
        self.allowed_clients = self._load_client_certificates()
    
    async def authenticate_client(self, client_cert: str) -> bool:
        """MCPクライアント証明書検証"""
        return client_cert in self.allowed_clients

データ暗号化

# 記憶データの暗号化(オプション)
from cryptography.fernet import Fernet

class MemoryEncryption:
    def __init__(self, user_key: str):
        self.cipher = Fernet(user_key.encode())
    
    def encrypt_value(self, value: str) -> str:
        return self.cipher.encrypt(value.encode()).decode()
    
    def decrypt_value(self, encrypted_value: str) -> str:
        return self.cipher.decrypt(encrypted_value.encode()).decode()

監査・ログ

セキュリティイベントログ

# 監査ログ
@app.middleware("http")
async def audit_log(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    
    # セキュリティ関連イベントのログ
    if request.url.path.startswith("/api/auth/"):
        await log_security_event({
            "event_type": "authentication",
            "user_ip": request.client.host,
            "endpoint": request.url.path,
            "status_code": response.status_code,
            "timestamp": datetime.utcnow()
        })
    
    return response

異常検知

# 不審な活動の検知
class SecurityMonitor:
    async def detect_brute_force(self, ip: str) -> bool:
        """ブルートフォース攻撃検知"""
        failed_attempts = await self.get_failed_attempts(ip)
        return failed_attempts > 10
    
    async def detect_unusual_access(self, user_id: str) -> bool:
        """異常なアクセスパターン検知"""
        pass

実装優先度

Phase 1: 基本認証 (高優先度)

  • JWT認証システム
  • ユーザー登録・ログイン
  • メモリデータの所有権管理

Phase 2: セキュリティ強化 (中優先度)

  • レート制限
  • セキュリティヘッダー
  • 監査ログ

Phase 3: 高度なセキュリティ (低優先度)

  • データ暗号化
  • 異常検知
  • セキュリティダッシュボード

設定例

# 認証設定
JWT_SECRET_KEY=your-super-secret-jwt-key
JWT_ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7

# セキュリティ設定
ALLOWED_ORIGINS=https://yourdomain.com,https://localhost:3000
ENABLE_RATE_LIMITING=true
REDIS_URL=redis://localhost:6379

# 暗号化設定(オプション)
ENCRYPTION_ENABLED=false
MASTER_ENCRYPTION_KEY=your-master-key

期待効果

  • セキュリティ: 個人データ保護の確保
  • スケーラビリティ: マルチユーザー対応
  • 信頼性: エンタープライズレベルの安全性
  • コンプライアンス: データ保護規制への対応

Comments

コメント機能は現在実装されていません。
GitHub API の comments エンドポイントを統合する予定です。

🤖 AI分析

分類結果

🔒 セキュリティ
🟡 中
77 スコア
カテゴリ 50
優先度 27
0

適用されたルール

Enhanced Security Detection
• Body contains keyword: "security"• Body contains keyword: "encryption"
securityencryptionauthentication
Enhanced Feature Request Detection
• Has matching label: "enhancement"

Details

Assignees:

None

Milestone:

None

Created:

2025/8/4

Updated:

2025/8/4