Issue #120: 分散Worker協調システムにおける非同期タスク完了管理の設計課題
Description
🎯 課題概要
Priority: HIGH
Impact: システム全体の協調動作
Component: 分散Worker協調システム
Files:
examples/poc/issue_solver_agent_distributed.py
templates/roles/queen.md
scripts/worker_communication.py
問題の詳細
現在の動作
現在のシステムでは、BeeKeeper が Queen にタスクを送信した時点で「完了」と見なされるが、実際には以下の問題が発生している:
- 早期完了判定: Queen が
[TASK_COMPLETED]
を出力した時点で処理完了と見なされる - 実際の作業は継続中: 各Worker(analyzer, developer, tester, documenter, reviewer)の実際の作業は数分〜数十分継続
- 結果の非同期性: Worker の完了結果は後から Queen に送信される(
WORKER_RESULT:...
) - 人間の判断タイミング: 人間は実際の作業完了後の結果を見て判断したい
理想的な動作フロー
BeeKeeper → Queen: タスク送信
↓
Queen → Workers: 分散タスク実行指示
↓
Workers: 実際の作業実行(数分〜数十分)
↓
Workers → Queen: 結果報告 (WORKER_RESULT:...)
↓
Queen: 全Worker完了確認 + 結果統合
↓
Queen → BeeKeeper: 最終結果報告
↓
Human: 結果確認と判断
技術的な課題
1. 非同期タスク管理
- 状態管理: 各Workerの進行状況をリアルタイムで追跡
- 完了判定: 全Worker完了の検出メカニズム
- タイムアウト: 長時間実行タスクのタイムアウト管理
2. 通信プロトコル
- 結果集約:
WORKER_RESULT:worker_name:task_id:result
の統合 - 進捗通知: 中間進捗の BeeKeeper への通知
- エラーハンドリング: Worker失敗時の対応
3. ユーザーエクスペリエンス
- 進捗表示: 「analyzer作業中...」「documenter完了」等の状態表示
- 中断機能: 長時間タスクの中断・再開機能
- 結果表示: 段階的な結果表示 vs 最終結果のみ
推奨解決策
Option A: 完全同期待機方式
# Queen が全Worker完了まで待機
async def _delegate_full_coordination_to_queen(self, parsed_request):
# 1. Queen にタスク送信
queen_result = await self.worker_communicator.send_task_to_worker(...)
# 2. Worker完了を監視
await self._monitor_worker_completion(session_id)
# 3. 最終結果を取得
final_result = await self._get_final_result(session_id)
return final_result
Option B: 非同期通知方式
# BeeKeeper が非同期で結果を受信
async def process_user_request(self, user_prompt):
# 1. 即座にタスク開始通知
print("🚀 分散タスク開始...")
# 2. 非同期で結果を待機
asyncio.create_task(self._monitor_and_report_results())
# 3. 進捗を定期的に表示
await self._show_progress_updates()
Option C: ハイブリッド方式
# 設定可能な待機モード
class DistributedBeeKeeperAgent:
def __init__(self, wait_mode="sync"): # "sync" | "async" | "progress"
self.wait_mode = wait_mode
async def process_user_request(self, user_prompt):
if self.wait_mode == "sync":
return await self._sync_processing(user_prompt)
elif self.wait_mode == "async":
return await self._async_processing(user_prompt)
else: # progress
return await self._progress_processing(user_prompt)
実装上の考慮事項
1. タイムアウト管理
WORKER_TIMEOUT = {
"analyzer": 300, # 5分
"developer": 1800, # 30分
"tester": 600, # 10分
"documenter": 300, # 5分
"reviewer": 600, # 10分
}
2. 進捗監視
async def _monitor_worker_progress(self, session_id):
"""Worker進捗のリアルタイム監視"""
while not self._all_workers_completed(session_id):
status = self._get_worker_status(session_id)
self._display_progress(status)
await asyncio.sleep(30) # 30秒ごとに更新
3. 結果集約
async def _collect_worker_results(self, session_id):
"""Worker結果の段階的収集"""
results = {}
while len(results) < self.expected_worker_count:
result = await self._wait_for_worker_result()
results[result["worker_name"]] = result
self._display_partial_result(result)
return results
設計上の判断ポイント
1. 同期 vs 非同期
- 同期: シンプルだが、長時間のブロッキング
- 非同期: 複雑だが、レスポンシブなUX
2. 進捗表示の粒度
- 詳細表示: 各Workerの進捗をリアルタイム表示
- 最終結果のみ: 完了時にまとめて表示
3. エラー処理
- 部分失敗: 一部Workerが失敗した場合の対応
- 全体失敗: システム全体の失敗時の回復
関連技術要素
tmux セッション管理
# Worker状態の監視
tmux capture-pane -t cozy-hive:analyzer -p | tail -5
tmux capture-pane -t cozy-hive:documenter -p | tail -5
状態管理パターン
class TaskSessionManager:
def __init__(self):
self.sessions = {} # session_id -> TaskSession
async def create_session(self, session_id, workers):
session = TaskSession(session_id, workers)
self.sessions[session_id] = session
return session
async def monitor_session(self, session_id):
session = self.sessions[session_id]
return await session.wait_for_completion()
次のステップ
- 設計決定: 同期・非同期・ハイブリッドのどれを採用するか
- プロトタイプ: 選択した方式の基本実装
- テスト: 実際のWorker連携での動作確認
- UX改善: 進捗表示とユーザーインターフェースの最適化
この課題は分散システムの本質的な同期問題であり、Hiveシステムの使いやすさを大きく左右する重要な設計判断となります。
🤖 Generated with Claude Code
Co-Authored-By: Claude noreply@anthropic.com
Comments
コメント機能は現在実装されていません。
GitHub API の comments エンドポイントを統合する予定です。
🤖 AI分析
分類結果
適用されたルール
Details
None
None
2025/7/16
2025/7/17