Issue #141: 新機能: 外部サービス統合・プラグインシステム

Opened 2025/8/4 by @nyasuto Open
enhancement

Description

概要

Moryの機能を外部サービスやツールと連携させるプラグインシステムの実装

背景

  • 現状: Obsidian統合のみ
  • 課題: 多様なワークフローに対応できない
  • 需要: 既存ツールとのシームレス連携

統合対象サービス

1. ナレッジベース・ノートアプリ

# Notion統合
class NotionIntegration:
    async def sync_to_notion(self, memory: Memory) -> str:
        """メモリをNotionページに同期"""
        page = await self.notion_client.pages.create({
            "parent": {"database_id": self.database_id},
            "properties": {
                "Title": {"title": [{"text": {"content": memory.summary}}]},
                "Content": {"rich_text": [{"text": {"content": memory.value}}]},
                "Tags": {"multi_select": [{"name": tag} for tag in memory.tags]},
                "Created": {"date": {"start": memory.created_at.isoformat()}}
            }
        })
        return page["url"]

# Logseq統合
class LogseqIntegration:
    async def export_to_logseq(self, memories: List[Memory]) -> str:
        """Logseq形式でエクスポート"""
        logseq_content = ""
        for memory in memories:
            logseq_content += f"""
- {memory.summary} #{' #'.join(memory.tags)}
  :PROPERTIES:
  :id: {memory.id}
  :created: {memory.created_at}
  :END:
  {memory.value}
"""
        return logseq_content

2. タスク管理・生産性ツール

# Todoist統合
class TodoistIntegration:
    async def create_task_from_memory(self, memory: Memory) -> dict:
        """メモリからタスクを作成"""
        if "TODO" in memory.tags or "task" in memory.value.lower():
            task = await self.todoist_client.tasks.add({
                "content": memory.summary,
                "description": memory.value,
                "labels": memory.tags,
                "due_string": "today"  # AI解析で期日抽出
            })
            return task

# Google Calendar統合
class CalendarIntegration:
    async def schedule_from_memory(self, memory: Memory) -> dict:
        """メモリから予定を作成"""
        # NLP処理で日時・場所を抽出
        parsed_info = await self.parse_schedule_info(memory.value)
        
        if parsed_info["has_datetime"]:
            event = await self.calendar_client.events().insert({
                "calendarId": "primary",
                "body": {
                    "summary": memory.summary,
                    "description": memory.value,
                    "start": {"dateTime": parsed_info["start_time"]},
                    "end": {"dateTime": parsed_info["end_time"]}
                }
            })
            return event

3. コミュニケーション・共有

# Slack統合
class SlackIntegration:
    async def share_memory_summary(self, memory: Memory, channel: str):
        """デイリーサマリーをSlackに投稿"""
        await self.slack_client.chat_postMessage({
            "channel": channel,
            "text": f"今日の学び: {memory.summary}",
            "attachments": [{
                "color": "good",
                "fields": [
                    {"title": "タグ", "value": ", ".join(memory.tags), "short": True},
                    {"title": "作成日時", "value": memory.created_at.strftime("%Y-%m-%d %H:%M"), "short": True}
                ],
                "text": memory.value[:200] + "..." if len(memory.value) > 200 else memory.value
            }]
        })

# Discord Bot統合
class DiscordIntegration:
    async def setup_memory_bot(self):
        """Discordボットでメモリ機能提供"""
        @self.bot.command(name="remember")
        async def remember_command(ctx, *, content):
            """\!remember コマンドでメモリ保存"""
            memory = await self.memory_service.create({
                "value": content,
                "source": "discord",
                "user_id": str(ctx.author.id)
            })
            await ctx.send(f"記憶しました: {memory.summary}")

4. AI・分析サービス

# Zapier統合
class ZapierIntegration:
    async def trigger_webhook(self, event_type: str, data: dict):
        """Zapier Webhookトリガー"""
        webhook_url = self.config.zapier_webhook_url
        payload = {
            "event": event_type,
            "timestamp": datetime.utcnow().isoformat(),
            "data": data
        }
        await self.http_client.post(webhook_url, json=payload)

# IFTTT統合
class IFTTTIntegration:
    async def trigger_ifttt(self, trigger_name: str, values: dict):
        """IFTTTトリガー実行"""
        url = f"https://maker.ifttt.com/trigger/{trigger_name}/with/key/{self.api_key}"
        await self.http_client.post(url, json=values)

プラグインシステム設計

プラグインアーキテクチャ

# プラグインベースクラス
from abc import ABC, abstractmethod

class MemoryPlugin(ABC):
    """メモリプラグインの基底クラス"""
    
    @abstractmethod
    async def on_memory_created(self, memory: Memory) -> None:
        """メモリ作成時の処理"""
        pass
    
    @abstractmethod
    async def on_memory_updated(self, memory: Memory) -> None:
        """メモリ更新時の処理"""
        pass
    
    @abstractmethod
    async def configure(self, config: dict) -> None:
        """プラグイン設定"""
        pass

# プラグインマネージャー
class PluginManager:
    def __init__(self):
        self.plugins: Dict[str, MemoryPlugin] = {}
    
    async def register_plugin(self, name: str, plugin: MemoryPlugin):
        """プラグイン登録"""
        await plugin.configure(self.get_plugin_config(name))
        self.plugins[name] = plugin
    
    async def trigger_event(self, event: str, data: dict):
        """全プラグインにイベント通知"""
        for plugin in self.plugins.values():
            try:
                await getattr(plugin, f"on_{event}")(data)
            except Exception as e:
                logger.error(f"Plugin error: {e}")

設定管理

# プラグイン設定
class PluginConfig:
    def __init__(self):
        self.integrations = {
            "notion": {
                "enabled": True,
                "api_token": os.getenv("NOTION_API_TOKEN"),
                "database_id": os.getenv("NOTION_DATABASE_ID"),
                "auto_sync": True
            },
            "slack": {
                "enabled": False,
                "webhook_url": os.getenv("SLACK_WEBHOOK_URL"),
                "channel": "#memories",
                "daily_summary": True
            },
            "todoist": {
                "enabled": False,
                "api_token": os.getenv("TODOIST_API_TOKEN"),
                "project_id": "default",
                "task_detection": True
            }
        }

API拡張

統合管理API

# 統合設定API
@router.get("/api/integrations")
async def list_integrations(current_user: User):
    """利用可能な統合一覧"""
    return {
        "available": [
            {
                "name": "notion",
                "display_name": "Notion",
                "description": "ナレッジベースとして同期",
                "enabled": current_user.integrations.notion.enabled,
                "config_required": ["api_token", "database_id"]
            },
            # ... other integrations
        ]
    }

@router.post("/api/integrations/{integration_name}/configure")
async def configure_integration(
    integration_name: str,
    config: dict,
    current_user: User
):
    """統合設定"""
    integration = await get_integration(integration_name)
    await integration.configure(config)
    await save_user_integration_config(current_user.id, integration_name, config)
    return {"status": "configured"}

@router.post("/api/integrations/{integration_name}/test")
async def test_integration(integration_name: str, current_user: User):
    """統合接続テスト"""
    integration = await get_integration(integration_name)
    result = await integration.test_connection()
    return {"status": "success" if result else "failed"}

自動化ワークフロー

# ワークフロー定義
class WorkflowEngine:
    async def create_workflow(self, user_id: str, workflow_def: dict):
        """ワークフロー作成"""
        workflow = {
            "id": str(uuid.uuid4()),
            "user_id": user_id,
            "name": workflow_def["name"],
            "triggers": workflow_def["triggers"],  # memory_created, tag_added, etc.
            "actions": workflow_def["actions"],    # notion_sync, slack_post, etc.
            "conditions": workflow_def.get("conditions", []),
            "enabled": True
        }
        await self.save_workflow(workflow)
        return workflow

# ワークフロー例
example_workflow = {
    "name": "技術メモをNotionに自動同期",
    "triggers": ["memory_created"],
    "conditions": [
        {"field": "tags", "operator": "contains", "value": "技術"}
    ],
    "actions": [
        {"type": "notion_sync", "config": {"database": "tech_notes"}},
        {"type": "slack_notify", "config": {"channel": "#tech-team"}}
    ]
}

データベース拡張

統合データ

-- ユーザー統合設定
CREATE TABLE user_integrations (
    id TEXT PRIMARY KEY,
    user_id TEXT REFERENCES users(id),
    integration_name TEXT NOT NULL,
    config JSON NOT NULL,
    enabled BOOLEAN DEFAULT true,
    last_sync DATETIME,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- 同期ログ
CREATE TABLE sync_logs (
    id TEXT PRIMARY KEY,
    user_id TEXT,
    integration_name TEXT,
    memory_id TEXT,
    external_id TEXT,  -- 外部サービスでのID
    sync_status TEXT,  -- success/failed/pending
    sync_data JSON,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- ワークフロー
CREATE TABLE workflows (
    id TEXT PRIMARY KEY,
    user_id TEXT REFERENCES users(id),
    name TEXT NOT NULL,
    definition JSON NOT NULL,
    enabled BOOLEAN DEFAULT true,
    execution_count INTEGER DEFAULT 0,
    last_executed DATETIME,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

セキュリティ考慮事項

OAuth認証

# OAuth2フロー
class OAuth2Integration:
    async def start_oauth_flow(self, integration_name: str, user_id: str):
        """OAuth認証開始"""
        state = secrets.token_urlsafe(32)
        await self.store_oauth_state(state, user_id, integration_name)
        
        auth_url = f"{self.auth_endpoint}?" + urllib.parse.urlencode({
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "scope": self.required_scopes,
            "state": state,
            "response_type": "code"
        })
        return auth_url

    async def handle_oauth_callback(self, code: str, state: str):
        """OAuth認証完了処理"""
        user_id = await self.verify_oauth_state(state)
        tokens = await self.exchange_code_for_tokens(code)
        await self.store_encrypted_tokens(user_id, tokens)

実装スケジュール

Phase 1: 基盤実装 (3週間)

  • プラグインシステム設計
  • 統合管理API
  • 基本的な外部サービス統合 (Notion, Slack)

Phase 2: 高度な統合 (3週間)

  • ワークフロー エンジン
  • AI連携統合
  • OAuth2認証システム

Phase 3: 拡張・最適化 (2週間)

  • 追加サービス統合
  • パフォーマンス最適化
  • セキュリティ強化

期待効果

  • ワークフロー統合: 既存ツールとのシームレス連携
  • 自動化: 手動作業の削減
  • エコシステム: Moryを中心とした情報管理環境
  • 拡張性: ユーザーニーズに応じたカスタマイズ

Comments

コメント機能は現在実装されていません。
GitHub API の comments エンドポイントを統合する予定です。

🤖 AI分析

分類結果

❓ 質問
🟡 中
47 スコア
カテゴリ 20
優先度 27
0

Details

Assignees:

None

Milestone:

None

Created:

2025/8/4

Updated:

2025/8/4