diff --git a/src-tauri/src/engine/budget.rs b/src-tauri/src/engine/budget.rs index bf43c17..d333008 100644 --- a/src-tauri/src/engine/budget.rs +++ b/src-tauri/src/engine/budget.rs @@ -149,13 +149,24 @@ pub fn get_weekly_usage() -> Result<(i64, String), String> { /// usage earlier in the week from falsely exhausting the budget for remaining days — /// the actual subscription resets on rolling windows, not as a rigid weekly lump sum. pub fn calculate_budget_status(config: &BudgetConfig) -> BudgetStatus { + calculate_budget_status_with_reservation(config, 0) +} + +/// Calculate budget status accounting for tokens already reserved by +/// in-flight tasks. Used to prevent over-commit when multiple tasks +/// start concurrently. +pub fn calculate_budget_status_with_reservation( + config: &BudgetConfig, + tokens_reserved: i64, +) -> BudgetStatus { let (tokens_today, _) = get_today_usage().unwrap_or((0, "unavailable".to_string())); let (tokens_week, source) = get_weekly_usage().unwrap_or((0, "unavailable".to_string())); let daily_budget = config.weekly_token_budget / 7; let max_for_sustn = daily_budget * (config.max_usage_percent as i64) / 100; let reserve = daily_budget * (config.reserve_percent as i64) / 100; - let available = (max_for_sustn - tokens_today - reserve).max(0); + let available = + (max_for_sustn - tokens_today - reserve - tokens_reserved).max(0); BudgetStatus { weekly_token_budget: config.weekly_token_budget, diff --git a/src-tauri/src/engine/git.rs b/src-tauri/src/engine/git.rs index 3ffd224..68e64dd 100644 --- a/src-tauri/src/engine/git.rs +++ b/src-tauri/src/engine/git.rs @@ -8,7 +8,7 @@ pub struct GitResult { pub error: Option, } -fn run_git(cwd: &str, args: &[&str]) -> GitResult { +pub(crate) fn run_git(cwd: &str, args: &[&str]) -> GitResult { match Command::new("git").args(args).current_dir(cwd).output() { Ok(output) => { let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); @@ -257,6 +257,103 @@ pub fn detect_default_branch(cwd: &str) -> String { "main".to_string() } +/// Clone a repository. Runs in the parent directory of `destination`. +/// Uses GIT_TERMINAL_PROMPT=0 to prevent interactive credential prompts +/// from hanging the process. Skips LFS to avoid downloading large +/// binary files that aren't needed for code review. +pub fn clone_repo(url: &str, destination: &str) -> GitResult { + let dest_path = std::path::Path::new(destination); + + // Create parent directory if needed + if let Some(parent) = dest_path.parent() { + let _ = std::fs::create_dir_all(parent); + } + + match Command::new("git") + .args(["clone", url, destination]) + .env("GIT_TERMINAL_PROMPT", "0") + .env("GIT_LFS_SKIP_SMUDGE", "1") + .output() + { + Ok(output) => { + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + if output.status.success() { + GitResult { + success: true, + output: destination.to_string(), + error: None, + } + } else { + GitResult { + success: false, + output: stdout, + error: Some(stderr), + } + } + } + Err(e) => GitResult { + success: false, + output: String::new(), + error: Some(format!("Failed to execute git clone: {}", e)), + }, + } +} + +/// Fetch a specific branch from origin and create a local branch. +/// Tries the branch name first, then falls back to `pull//head` +/// for fork-based PRs where the branch doesn't exist on the upstream remote. +pub fn fetch_branch(cwd: &str, branch_name: &str) -> GitResult { + fetch_branch_with_pr(cwd, branch_name, None) +} + +/// Fetch a branch, with an optional PR number for fork-based PR fallback. +pub fn fetch_branch_with_pr(cwd: &str, branch_name: &str, pr_number: Option) -> GitResult { + // Try fetching by branch name first (works for same-repo PRs) + let refspec = format!( + "refs/heads/{}:refs/remotes/origin/{}", + branch_name, branch_name + ); + let fetch = run_git(cwd, &["fetch", "origin", &refspec]); + + if !fetch.success { + if let Some(pr_num) = pr_number { + // Fallback: fetch via GitHub's PR ref (works for fork-based PRs) + println!( + "[git] branch {} not found on origin, trying pull/{}/head", + branch_name, pr_num + ); + let pr_refspec = format!( + "refs/pull/{}/head:refs/remotes/origin/{}", + pr_num, branch_name + ); + let pr_fetch = run_git(cwd, &["fetch", "origin", &pr_refspec]); + if !pr_fetch.success { + return pr_fetch; + } + } else { + return fetch; + } + } + + // Delete stale local branch if it exists (may point to wrong commit) + let _ = run_git(cwd, &["branch", "-D", branch_name]); + // Create local branch from the fetched ref + run_git( + cwd, + &[ + "branch", + branch_name, + &format!("origin/{}", branch_name), + ], + ) +} + +/// Get the remote URL for origin. +pub fn get_remote_url(cwd: &str) -> GitResult { + run_git(cwd, &["remote", "get-url", "origin"]) +} + /// Generate a branch name for a task. pub fn task_branch_name(task_id: &str) -> String { // Use first 8 chars of UUID for readability diff --git a/src-tauri/src/engine/mod.rs b/src-tauri/src/engine/mod.rs index 699a07f..d5811eb 100644 --- a/src-tauri/src/engine/mod.rs +++ b/src-tauri/src/engine/mod.rs @@ -5,33 +5,49 @@ pub mod prioritizer; pub mod scanner; pub mod scheduler; pub mod worker; +pub mod worktree; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use tauri::Emitter; use tokio::sync::{Mutex, RwLock}; +/// Default number of tasks that can run concurrently. +pub const DEFAULT_CONCURRENCY_LIMIT: usize = 5; + /// Global engine state shared across Tauri commands and the background scheduler. pub struct EngineState { /// Whether the engine scheduler loop is running. pub running: RwLock, - /// The currently executing task (if any). Only one task runs at a time. - pub current_task: Mutex>, + /// Currently executing tasks, keyed by task_id. + pub running_tasks: Mutex>, /// Handle to cancel the scheduler loop. pub cancel_token: Mutex>>, /// Repository IDs that currently have a deep scan in progress. /// Task execution waits for the scan to finish before starting, /// preventing concurrent Claude CLI instances in the same repo. pub deep_scanning_repos: Mutex>, + /// Maximum number of tasks that can run concurrently. + pub concurrency_limit: RwLock, + /// Total tokens reserved by in-flight tasks (prevents over-commit + /// when multiple tasks start near-simultaneously). + pub tokens_reserved: Mutex, + /// Number of scans currently running. Counted against the concurrency + /// limit so scans and tasks don't over-subscribe Claude CLI. + pub active_scans: Mutex, } impl EngineState { pub fn new() -> Arc { Arc::new(Self { running: RwLock::new(false), - current_task: Mutex::new(None), + running_tasks: Mutex::new(HashMap::new()), cancel_token: Mutex::new(None), deep_scanning_repos: Mutex::new(HashSet::new()), + concurrency_limit: RwLock::new(DEFAULT_CONCURRENCY_LIMIT), + tokens_reserved: Mutex::new(0), + active_scans: Mutex::new(0), }) } } @@ -63,11 +79,129 @@ pub struct CliResult { pub session_id: Option, } -/// Extract session_id from Claude CLI JSON output. -fn extract_session_id(stdout: &str) -> Option { - serde_json::from_str::(stdout.trim()) - .ok() - .and_then(|v| v.get("session_id")?.as_str().map(|s| s.to_string())) +/// Event emitted for each line of Claude CLI output during streaming. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskOutputEvent { + pub task_id: String, + /// Parsed "type" from stream-json: "system", "assistant", "user", "result" + pub event_type: Option, + /// Structured content blocks extracted from the event + pub blocks: Vec, + /// Full JSON line (for debugging) + pub raw: String, + pub timestamp: String, +} + +/// A parsed content block from a stream-json event. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ContentBlock { + /// "text", "tool_use", "tool_result", "thinking" + pub kind: String, + /// Text content, tool summary, or result summary + pub text: Option, + /// Tool name (for tool_use and tool_result) + pub tool_name: Option, + /// File path or key input hint (for tool_use) + pub tool_target: Option, +} + +/// Extract human-readable content blocks from a stream-json event. +/// Returns multiple blocks because a single assistant event can contain +/// both text and tool_use blocks. +fn extract_blocks(event_type: &str, value: &serde_json::Value) -> Vec { + match event_type { + "assistant" | "user" => { + // Both assistant and user events have message.content[] arrays. + // Assistant events contain text and tool_use blocks. + // User events contain tool_result blocks. + value + .get("message") + .and_then(|m| m.get("content")) + .and_then(|c| c.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|item| extract_block(item)) + .collect() + }) + .unwrap_or_default() + } + "result" => { + let text = value + .get("result") + .and_then(|r| r.as_str()) + .map(|s| s.chars().take(500).collect::()); + vec![ContentBlock { + kind: "result".to_string(), + text, + tool_name: None, + tool_target: None, + }] + } + _ => vec![], + } +} + +fn extract_block(item: &serde_json::Value) -> Option { + let kind = item.get("type")?.as_str()?; + match kind { + "text" => Some(ContentBlock { + kind: "text".to_string(), + text: item.get("text").and_then(|t| t.as_str()).map(String::from), + tool_name: None, + tool_target: None, + }), + "tool_use" => { + let name = item.get("name").and_then(|n| n.as_str()).map(String::from); + let target = item.get("input").and_then(|i| { + i.get("file_path") + .or_else(|| i.get("path")) + .or_else(|| i.get("pattern")) + .or_else(|| i.get("command")) + .or_else(|| i.get("url")) + .and_then(|v| v.as_str()) + .map(|s| s.chars().take(200).collect::()) + }); + Some(ContentBlock { + kind: "tool_use".to_string(), + text: None, + tool_name: name, + tool_target: target, + }) + } + "tool_result" => { + let content_text = item.get("content").and_then(|c| { + // tool_result content can be a string or an array of {type, text} + if let Some(s) = c.as_str() { + Some(s.chars().take(300).collect::()) + } else if let Some(arr) = c.as_array() { + arr.iter() + .filter_map(|b| b.get("text")?.as_str()) + .next() + .map(|s| s.chars().take(300).collect::()) + } else { + None + } + }); + Some(ContentBlock { + kind: "tool_result".to_string(), + text: content_text, + tool_name: None, + tool_target: None, + }) + } + "thinking" => Some(ContentBlock { + kind: "thinking".to_string(), + text: item + .get("thinking") + .and_then(|t| t.as_str()) + .map(|s| s.chars().take(300).collect::()), + tool_name: None, + tool_target: None, + }), + _ => None, + } } /// Resolve the full path to the `claude` CLI binary. @@ -105,7 +239,11 @@ fn resolve_claude_binary() -> String { } /// Invoke Claude Code CLI with the given prompt in the given working directory. -/// This is the core primitive that both scanner and worker use. +/// +/// Uses `--output-format stream-json --verbose` to stream newline-delimited +/// JSON events in real time. If `app_handle` and `task_id` are provided, +/// each event is emitted as an `agent:task-output` Tauri event for the +/// frontend to display live. /// /// If `stdin_content` is provided, it is piped to the process via stdin /// (used to pass pre-read file context, matching nightshift's approach). @@ -119,31 +257,28 @@ pub async fn invoke_claude_cli( stdin_content: Option<&str>, max_turns: Option, resume_session_id: Option<&str>, + app_handle: Option, + task_id: Option<&str>, ) -> Result { use std::process::Stdio; - use tokio::io::AsyncWriteExt; + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; println!( - "[engine] invoke_claude_cli — cwd={cwd}, timeout={timeout_secs}s, prompt_len={}, stdin_len={}, max_turns={:?}", + "[engine] invoke_claude_cli — cwd={cwd}, timeout={timeout_secs}s, prompt_len={}, stdin_len={}, max_turns={:?}, streaming={}", prompt.len(), stdin_content.map_or(0, |s| s.len()), max_turns, + app_handle.is_some(), ); - println!("[engine] ┌─── PROMPT ───────────────────────────────────────"); - for line in prompt.lines() { - println!("[engine] │ {line}"); - } - println!("[engine] └─────────────────────────────────────────────────"); - - let has_stdin = stdin_content.is_some(); let claude_bin = resolve_claude_binary(); let mut cmd = Command::new(&claude_bin); cmd.args([ "--print", "--output-format", - "json", + "stream-json", + "--verbose", "--dangerously-skip-permissions", ]); if let Some(session_id) = resume_session_id { @@ -154,23 +289,20 @@ pub async fn invoke_claude_cli( cmd.args(["--max-turns", &turns.to_string()]); } cmd.current_dir(cwd); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); - if has_stdin { + if stdin_content.is_some() { cmd.stdin(Stdio::piped()); } - // When we have stdin, we need spawn() to write to it; otherwise .output() is simpler + let mut child = cmd.spawn().map_err(|e| { + println!("[engine] claude CLI failed to spawn: {e}"); + format!("Failed to execute claude CLI: {}", e) + })?; + + // Write stdin content if provided if let Some(content) = stdin_content { - let mut child = cmd - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .map_err(|e| { - println!("[engine] claude CLI failed to spawn: {e}"); - format!("Failed to execute claude CLI: {}", e) - })?; - - // Write stdin content and close the pipe if let Some(mut stdin_pipe) = child.stdin.take() { let content = content.to_string(); tokio::spawn(async move { @@ -178,131 +310,150 @@ pub async fn invoke_claude_cli( let _ = stdin_pipe.shutdown().await; }); } + } - // Take stdout/stderr before wait() so child isn't consumed - let stdout_pipe = child.stdout.take(); - let stderr_pipe = child.stderr.take(); + // Read stdout line-by-line (stream-json emits one JSON object per line) + let stdout_pipe = child.stdout.take(); + let stderr_pipe = child.stderr.take(); - let stdout_task = tokio::spawn(async move { - let mut buf = Vec::new(); - if let Some(mut pipe) = stdout_pipe { - let _ = tokio::io::AsyncReadExt::read_to_end(&mut pipe, &mut buf).await; - } - buf - }); + let app_for_stdout = app_handle.clone(); + let task_id_for_stdout = task_id.map(|s| s.to_string()); - let stderr_task = tokio::spawn(async move { - let mut buf = Vec::new(); - if let Some(mut pipe) = stderr_pipe { - let _ = tokio::io::AsyncReadExt::read_to_end(&mut pipe, &mut buf).await; - } - buf - }); - - let result = tokio::time::timeout( - std::time::Duration::from_secs(timeout_secs), - child.wait(), - ) - .await; - - match result { - Ok(Ok(status)) => { - let stdout_bytes = stdout_task.await.unwrap_or_default(); - let stderr_bytes = stderr_task.await.unwrap_or_default(); - let stdout = String::from_utf8_lossy(&stdout_bytes).to_string(); - let stderr = String::from_utf8_lossy(&stderr_bytes).to_string(); - let session_id = extract_session_id(&stdout); - println!( - "[engine] claude CLI finished — exit_code={:?}, stdout_len={}, stderr_len={}, session_id={:?}", - status.code(), - stdout.len(), - stderr.len(), - session_id, - ); - println!("[engine] ┌─── STDOUT ──────────────────────────────────────"); - for line in stdout.lines() { - println!("[engine] │ {line}"); + let stdout_task = tokio::spawn(async move { + let mut lines = Vec::new(); + let mut session_id: Option = None; + let mut result_text: Option = None; + + if let Some(pipe) = stdout_pipe { + let reader = BufReader::new(pipe); + let mut line_stream = reader.lines(); + + while let Ok(Some(line)) = line_stream.next_line().await { + if line.trim().is_empty() { + continue; } - println!("[engine] └─────────────────────────────────────────────────"); - if !stderr.is_empty() { - println!("[engine] ┌─── STDERR ──────────────────────────────────────"); - for line in stderr.lines() { - println!("[engine] │ {line}"); + + // Try to parse as JSON to extract event type and content + let parsed = serde_json::from_str::(&line); + let event_type = parsed + .as_ref() + .ok() + .and_then(|v| v.get("type")?.as_str().map(|s| s.to_string())); + let blocks = parsed + .as_ref() + .ok() + .map(|v| { + let et = event_type.as_deref().unwrap_or(""); + extract_blocks(et, v) + }) + .unwrap_or_default(); + + // Extract session_id and result from the final "result" event + if event_type.as_deref() == Some("result") { + if let Ok(ref v) = parsed { + session_id = + v.get("session_id").and_then(|s| s.as_str().map(|s| s.to_string())); + result_text = + v.get("result").and_then(|r| r.as_str().map(|s| s.to_string())); } - println!("[engine] └─────────────────────────────────────────────────"); } - Ok(CliResult { - success: status.success(), - stdout, - stderr, - exit_code: status.code(), - session_id, - }) - } - Ok(Err(e)) => { - println!("[engine] claude CLI failed: {e}"); - Err(format!("Failed to execute claude CLI: {}", e)) - } - Err(_) => { - let _ = child.kill().await; - println!("[engine] claude CLI timed out after {timeout_secs}s"); - Err(format!( - "Claude CLI timed out after {} seconds", - timeout_secs - )) - } - } - } else { - // No stdin — simple .output() path - let result = tokio::time::timeout( - std::time::Duration::from_secs(timeout_secs), - cmd.output(), - ) - .await; - - match result { - Ok(Ok(output)) => { - let stdout = String::from_utf8_lossy(&output.stdout).to_string(); - let stderr = String::from_utf8_lossy(&output.stderr).to_string(); - let session_id = extract_session_id(&stdout); - println!( - "[engine] claude CLI finished — exit_code={:?}, stdout_len={}, stderr_len={}, session_id={:?}", - output.status.code(), - stdout.len(), - stderr.len(), - session_id, - ); - println!("[engine] ┌─── STDOUT ──────────────────────────────────────"); - for line in stdout.lines() { - println!("[engine] │ {line}"); - } - println!("[engine] └─────────────────────────────────────────────────"); - if !stderr.is_empty() { - println!("[engine] ┌─── STDERR ──────────────────────────────────────"); - for line in stderr.lines() { - println!("[engine] │ {line}"); + + // Skip events with no meaningful content (except system init and result) + let should_emit = !blocks.is_empty() + || event_type.as_deref() == Some("system") + || event_type.as_deref() == Some("result"); + + if should_emit { + if let (Some(ref app), Some(ref tid)) = + (&app_for_stdout, &task_id_for_stdout) + { + let event = TaskOutputEvent { + task_id: tid.clone(), + event_type: event_type.clone(), + blocks, + raw: line.clone(), + timestamp: chrono::Local::now().to_rfc3339(), + }; + let _ = app.emit("agent:task-output", &event); } - println!("[engine] └─────────────────────────────────────────────────"); } - Ok(CliResult { - success: output.status.success(), - stdout, - stderr, - exit_code: output.status.code(), - session_id, - }) - } - Ok(Err(e)) => { - println!("[engine] claude CLI failed to execute: {e}"); - Err(format!("Failed to execute claude CLI: {}", e)) + + lines.push(line); } - Err(_) => { - println!("[engine] claude CLI timed out after {timeout_secs}s"); - Err(format!( - "Claude CLI timed out after {} seconds", - timeout_secs - )) + } + + (lines, session_id, result_text) + }); + + let stderr_task = tokio::spawn(async move { + let mut stderr_lines = Vec::new(); + if let Some(pipe) = stderr_pipe { + let reader = BufReader::new(pipe); + let mut line_stream = reader.lines(); + while let Ok(Some(line)) = line_stream.next_line().await { + stderr_lines.push(line); } } + stderr_lines + }); + + // Wait for the process with timeout + let result = tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + child.wait(), + ) + .await; + + match result { + Ok(Ok(status)) => { + let (stdout_lines, cli_session_id, result_text) = + stdout_task.await.unwrap_or_else(|_| (vec![], None, None)); + let stderr_lines = stderr_task.await.unwrap_or_default(); + let stderr = stderr_lines.join("\n"); + + // Synthesize stdout as if it came from --output-format json + // so that parse_implement_output / parse_review_output still work. + let stdout = if let Some(ref result) = result_text { + // Build a JSON envelope matching the old format + let envelope = serde_json::json!({ + "type": "result", + "subtype": "success", + "result": result, + "session_id": cli_session_id, + }); + envelope.to_string() + } else { + // Fallback: join all lines (shouldn't normally happen) + stdout_lines.join("\n") + }; + + println!( + "[engine] claude CLI finished — exit_code={:?}, stdout_len={}, stderr_len={}, session_id={:?}", + status.code(), + stdout.len(), + stderr.len(), + cli_session_id, + ); + + Ok(CliResult { + success: status.success(), + stdout, + stderr, + exit_code: status.code(), + session_id: cli_session_id, + }) + } + Ok(Err(e)) => { + println!("[engine] claude CLI failed: {e}"); + Err(format!("Failed to execute claude CLI: {}", e)) + } + Err(_) => { + let _ = child.kill().await; + println!("[engine] claude CLI timed out after {timeout_secs}s"); + Err(format!( + "Claude CLI timed out after {} seconds", + timeout_secs + )) + } } } diff --git a/src-tauri/src/engine/scanner.rs b/src-tauri/src/engine/scanner.rs index e1255bd..f9c2e44 100644 --- a/src-tauri/src/engine/scanner.rs +++ b/src-tauri/src/engine/scanner.rs @@ -309,7 +309,7 @@ pub async fn scan_repository(repo_path: &str, scan_preferences: Option<&str>) -> let timeout_secs = 300; // 5 minutes let start = std::time::Instant::now(); - let result = invoke_claude_cli(repo_path, &augmented_prompt, timeout_secs, Some(&file_context), None, None).await; + let result = invoke_claude_cli(repo_path, &augmented_prompt, timeout_secs, Some(&file_context), None, None, None, None).await; let elapsed = start.elapsed(); println!("[engine] pass 1 scan took {:.1}s", elapsed.as_secs_f64()); @@ -348,7 +348,7 @@ pub async fn deep_scan_repository( let timeout_secs = 600; // 10 minutes — Claude uses tools here let start = std::time::Instant::now(); - let result = invoke_claude_cli(repo_path, &deep_prompt, timeout_secs, None, None, None).await; + let result = invoke_claude_cli(repo_path, &deep_prompt, timeout_secs, None, None, None, None, None).await; let elapsed = start.elapsed(); println!("[engine] pass 2 deep scan took {:.1}s", elapsed.as_secs_f64()); diff --git a/src-tauri/src/engine/worker.rs b/src-tauri/src/engine/worker.rs index 88e058e..b977d61 100644 --- a/src-tauri/src/engine/worker.rs +++ b/src-tauri/src/engine/worker.rs @@ -76,90 +76,12 @@ pub async fn execute_task( user_messages: Option, resume_session_id: Option, agent_preferences: Option<&str>, + app_handle: Option, ) -> WorkResult { println!("[worker] execute_task START — task_id={task_id}, title={task_title}, base_branch={base_branch}, branch={branch_name}, files={files_involved:?}, has_user_messages={}, resume_session={:?}", user_messages.is_some(), resume_session_id); - // Save original branch so we can return to it - let original_branch = git::current_branch(repo_path); - if !original_branch.success { - return WorkResult { - success: false, - phase_reached: TaskPhase::Planning, - branch_name: None, - commit_sha: None, - files_modified: vec![], - summary: None, - error: Some(format!( - "Could not determine current branch: {}", - original_branch.error.unwrap_or_else(|| "unknown error".to_string()) - )), - review_warnings: None, - session_id: None, - }; - } - let original_branch_name = original_branch.output.clone(); - - // Auto-stash if working tree is dirty (safety net for queue transitions) - if !git::is_clean(repo_path) { - println!("[worker] dirty working tree — auto-stashing"); - let stash = git::stash(repo_path); - if !stash.success { - return WorkResult { - success: false, - phase_reached: TaskPhase::Planning, - branch_name: None, - commit_sha: None, - files_modified: vec![], - summary: None, - error: Some( - "Working tree is dirty and auto-stash failed. Commit or stash changes manually.".to_string(), - ), - review_warnings: None, - session_id: None, - }; - } - } - - // Create task branch from the specified base branch - if git::branch_exists(repo_path, branch_name) { - // Branch already exists — checkout it - let checkout = git::checkout_branch(repo_path, branch_name); - if !checkout.success { - return WorkResult { - success: false, - phase_reached: TaskPhase::Planning, - branch_name: Some(branch_name.to_string()), - commit_sha: None, - files_modified: vec![], - summary: None, - error: Some(format!( - "Failed to checkout branch: {}", - checkout.error.unwrap_or_else(|| "unknown error".to_string()) - )), - review_warnings: None, - session_id: None, - }; - } - } else { - let create = git::create_branch_from(repo_path, branch_name, base_branch); - if !create.success { - return WorkResult { - success: false, - phase_reached: TaskPhase::Planning, - branch_name: Some(branch_name.to_string()), - commit_sha: None, - files_modified: vec![], - summary: None, - error: Some(format!( - "Failed to create branch from {}: {}", - base_branch, - create.error.unwrap_or_else(|| "unknown error".to_string()) - )), - review_warnings: None, - session_id: None, - }; - } - } + // repo_path is a worktree with the task branch already checked out. + // No need to save/restore branches or stash — the worktree is isolated. // Run the implement phase (we skip separate plan phase for now — // Claude Code is capable enough to plan inline during implementation) @@ -176,7 +98,7 @@ pub async fn execute_task( println!("[worker] running implement phase..."); let implement_result = - run_implement_phase(repo_path, task_id, task_title, task_description, files_involved, &last_feedback, current_resume_id.as_deref(), agent_preferences) + run_implement_phase(repo_path, task_id, task_title, task_description, files_involved, &last_feedback, current_resume_id.as_deref(), agent_preferences, app_handle.clone()) .await; // Only resume on the first attempt current_resume_id = None; @@ -200,7 +122,6 @@ pub async fn execute_task( // fail early instead of running a pointless review. if !git::has_commits_ahead(repo_path, base_branch) { println!("[worker] NO COMMITS on branch — Claude likely did not commit"); - let _ = git::checkout_branch(repo_path, &original_branch_name); return WorkResult { success: false, phase_reached: TaskPhase::Implementing, @@ -220,9 +141,11 @@ pub async fn execute_task( println!("[worker] running review phase..."); let review_result = run_review_phase( repo_path, + task_id, task_title, &impl_output.summary.clone().unwrap_or_default(), agent_preferences, + app_handle.clone(), ) .await; @@ -240,10 +163,9 @@ pub async fn execute_task( match review_result { Ok(review) if review.passed.unwrap_or(false) => { - // Success — get commit SHA and return to original branch + // Success — get commit SHA let sha = git::latest_commit_sha(repo_path); - println!("[worker] REVIEW PASSED — sha={:?}, returning to branch {original_branch_name}", sha.output); - let _ = git::checkout_branch(repo_path, &original_branch_name); + println!("[worker] REVIEW PASSED — sha={:?}", sha.output); return WorkResult { success: true, @@ -277,7 +199,6 @@ pub async fn execute_task( if !has_critical { let sha = git::latest_commit_sha(repo_path); println!("[worker] SOFT-PASS — no critical issues after {attempt} attempts, accepting with warnings"); - let _ = git::checkout_branch(repo_path, &original_branch_name); return WorkResult { success: true, phase_reached: TaskPhase::Reviewing, @@ -298,8 +219,6 @@ pub async fn execute_task( } // Genuine critical issues that couldn't be fixed - let _ = - git::checkout_branch(repo_path, &original_branch_name); return WorkResult { success: false, phase_reached: TaskPhase::Reviewing, @@ -321,7 +240,6 @@ pub async fn execute_task( last_feedback = review.feedback.clone(); } Err(e) => { - let _ = git::checkout_branch(repo_path, &original_branch_name); return WorkResult { success: false, phase_reached: TaskPhase::Reviewing, @@ -337,7 +255,6 @@ pub async fn execute_task( } } Err(e) => { - let _ = git::checkout_branch(repo_path, &original_branch_name); return WorkResult { success: false, phase_reached: TaskPhase::Implementing, @@ -363,6 +280,7 @@ async fn run_implement_phase( previous_feedback: &Option, resume_session_id: Option<&str>, agent_preferences: Option<&str>, + app_handle: Option, ) -> Result { let prefs_section = match agent_preferences { Some(prefs) if !prefs.trim().is_empty() => format!("\n\n## Project-Specific Instructions\n{prefs}"), @@ -435,7 +353,7 @@ When done, output ONLY this JSON (no markdown, no explanation): }; println!("[worker:implement] invoking Claude CLI — prompt_len={}, resume={}", prompt.len(), resume_session_id.is_some()); - let result = invoke_claude_cli(repo_path, &prompt, WORK_TIMEOUT_SECS, None, None, resume_session_id).await?; + let result = invoke_claude_cli(repo_path, &prompt, WORK_TIMEOUT_SECS, None, None, resume_session_id, app_handle, Some(task_id)).await?; println!( "[worker:implement] Claude CLI returned — success={}, exit={:?}, stdout_len={}, stderr_len={}", result.success, result.exit_code, result.stdout.len(), result.stderr.len() @@ -461,9 +379,11 @@ When done, output ONLY this JSON (no markdown, no explanation): async fn run_review_phase( repo_path: &str, + task_id: &str, task_title: &str, implementation_summary: &str, agent_preferences: Option<&str>, + app_handle: Option, ) -> Result { let prefs_section = match agent_preferences { Some(prefs) if !prefs.trim().is_empty() => format!("\n\n## Project-Specific Instructions\n{prefs}"), @@ -507,7 +427,7 @@ Output ONLY this JSON (no markdown, no explanation): ); println!("[worker:review] invoking Claude CLI — prompt_len={}", prompt.len()); - let result = invoke_claude_cli(repo_path, &prompt, WORK_TIMEOUT_SECS, None, None, None).await?; + let result = invoke_claude_cli(repo_path, &prompt, WORK_TIMEOUT_SECS, None, None, None, app_handle, Some(task_id)).await?; println!( "[worker:review] Claude CLI returned — success={}, exit={:?}, stdout_len={}, stderr_len={}", result.success, result.exit_code, result.stdout.len(), result.stderr.len() diff --git a/src-tauri/src/engine/worktree.rs b/src-tauri/src/engine/worktree.rs new file mode 100644 index 0000000..b6afc44 --- /dev/null +++ b/src-tauri/src/engine/worktree.rs @@ -0,0 +1,170 @@ +use std::fs; +use std::path::Path; + +use super::git; + +/// Compute the worktree directory path for a task. +/// Uses first 8 chars of the task ID for readability. +pub fn get_worktree_path(repo_path: &str, task_id: &str) -> String { + let short_id = if task_id.len() >= 8 { + &task_id[..8] + } else { + task_id + }; + format!("{}/.sustn/worktrees/{}", repo_path, short_id) +} + +/// Create a git worktree for a task. Idempotent — if the worktree already +/// exists, returns the existing path without error. +/// +/// For new branches: `git worktree add -b ` +/// For existing branches: `git worktree add ` +pub fn create_worktree( + repo_path: &str, + task_id: &str, + branch_name: &str, + base_branch: &str, +) -> Result { + let wt_path = get_worktree_path(repo_path, task_id); + + // If the worktree directory already exists, verify it's valid + if Path::new(&wt_path).exists() { + // Check if it's a valid git worktree by running a git command in it + let check = git::run_git(&wt_path, &["rev-parse", "--git-dir"]); + if check.success { + println!( + "[worktree] reusing existing worktree at {} for task {}", + wt_path, task_id + ); + return Ok(wt_path); + } + // Directory exists but isn't a valid worktree — clean it up + println!( + "[worktree] removing stale worktree directory at {}", + wt_path + ); + let _ = fs::remove_dir_all(&wt_path); + // Also prune stale worktree entries + let _ = git::run_git(repo_path, &["worktree", "prune"]); + } + + // Ensure parent directory exists + let parent = format!("{}/.sustn/worktrees", repo_path); + fs::create_dir_all(&parent) + .map_err(|e| format!("Failed to create worktree directory: {}", e))?; + + // Try to create the worktree + let result = if git::branch_exists(repo_path, branch_name) { + // Branch already exists — just check it out in the worktree + git::run_git(repo_path, &["worktree", "add", &wt_path, branch_name]) + } else { + // Create a new branch from base + git::run_git( + repo_path, + &[ + "worktree", + "add", + &wt_path, + "-b", + branch_name, + base_branch, + ], + ) + }; + + if result.success { + println!( + "[worktree] created worktree at {} (branch: {}, base: {})", + wt_path, branch_name, base_branch + ); + Ok(wt_path) + } else { + let err = result.error.unwrap_or_else(|| "unknown error".to_string()); + + // Handle "already checked out" — find the existing worktree + if err.contains("already checked out") || err.contains("is already used by worktree") { + println!( + "[worktree] branch {} already checked out, looking for existing worktree", + branch_name + ); + // The branch is checked out somewhere — list worktrees to find it + let list = git::run_git(repo_path, &["worktree", "list", "--porcelain"]); + if list.success { + for line in list.output.lines() { + if let Some(path) = line.strip_prefix("worktree ") { + let branch_check = git::current_branch(path); + if branch_check.success && branch_check.output == branch_name { + println!( + "[worktree] found branch {} in existing worktree at {}", + branch_name, path + ); + return Ok(path.to_string()); + } + } + } + } + } + + Err(format!( + "Failed to create worktree for task {}: {}", + task_id, err + )) + } +} + +/// Remove a task's worktree. Tolerates missing worktrees. +pub fn remove_worktree(repo_path: &str, task_id: &str) -> Result<(), String> { + let wt_path = get_worktree_path(repo_path, task_id); + + if !Path::new(&wt_path).exists() { + // Already gone — prune any stale refs + let _ = git::run_git(repo_path, &["worktree", "prune"]); + return Ok(()); + } + + println!("[worktree] removing worktree at {}", wt_path); + let result = git::run_git(repo_path, &["worktree", "remove", "--force", &wt_path]); + + if result.success { + Ok(()) + } else { + // Force-remove the directory if git worktree remove fails + println!( + "[worktree] git worktree remove failed, falling back to directory removal: {:?}", + result.error + ); + let _ = fs::remove_dir_all(&wt_path); + let _ = git::run_git(repo_path, &["worktree", "prune"]); + Ok(()) + } +} + +/// Ensure `.sustn/` is in the repo's `.gitignore`. +/// Idempotent — does nothing if the entry already exists. +pub fn ensure_gitignore_entry(repo_path: &str) -> Result<(), String> { + let gitignore_path = format!("{}/.gitignore", repo_path); + let path = Path::new(&gitignore_path); + + if path.exists() { + let content = + fs::read_to_string(path).map_err(|e| format!("Failed to read .gitignore: {}", e))?; + // Check if .sustn/ is already ignored (any common form) + if content.lines().any(|line| { + let trimmed = line.trim(); + trimmed == ".sustn/" || trimmed == ".sustn" || trimmed == "/.sustn/" + }) { + return Ok(()); + } + // Append the entry + let suffix = if content.ends_with('\n') { "" } else { "\n" }; + fs::write(path, format!("{}{}.sustn/\n", content, suffix)) + .map_err(|e| format!("Failed to write .gitignore: {}", e))?; + } else { + // Create .gitignore with the entry + fs::write(path, ".sustn/\n") + .map_err(|e| format!("Failed to create .gitignore: {}", e))?; + } + + println!("[worktree] added .sustn/ to .gitignore"); + Ok(()) +} diff --git a/src-tauri/src/engine_commands.rs b/src-tauri/src/engine_commands.rs index 153d1bf..9f60d3a 100644 --- a/src-tauri/src/engine_commands.rs +++ b/src-tauri/src/engine_commands.rs @@ -22,26 +22,58 @@ pub async fn engine_get_budget(app: AppHandle) -> Result>, repo_path: String, repository_id: String, ) -> Result { println!("[engine] engine_scan_now invoked — repo_path={repo_path}, repository_id={repository_id}"); + // Check concurrency — scans count against the same pool as tasks + { + let tasks = state.running_tasks.lock().await; + let scans = *state.active_scans.lock().await; + let limit = *state.concurrency_limit.read().await; + if tasks.len() + scans >= limit { + return Err(format!( + "Concurrency limit reached ({}/{} in flight)", + tasks.len() + scans, limit + )); + } + } + + // Reserve a scan slot (increment active_scans) + { + let mut scans = state.active_scans.lock().await; + *scans += 1; + } + // Emit scan-started event let _ = app.emit("agent:scan-started", serde_json::json!({ "repositoryId": repository_id, })); // Read project-specific scan preferences - let app_data_dir_for_prefs = app - .path() - .app_data_dir() - .map_err(|e| format!("Failed to get app data dir: {e}"))?; + let app_data_dir_for_prefs = match app.path().app_data_dir() { + Ok(dir) => dir, + Err(e) => { + // Release scan slot on early error + let mut scans = state.active_scans.lock().await; + *scans = scans.saturating_sub(1); + return Err(format!("Failed to get app data dir: {e}")); + } + }; let (_agent_prefs, scan_prefs) = db::read_project_preferences(&app_data_dir_for_prefs, &repository_id); // --- Pass 1: Quick scan (pre-read files, no tool use) --- let result = scanner::scan_repository(&repo_path, scan_prefs.as_deref()).await; + // Release the Pass-1 scan slot. (Pass 2, if spawned, will increment + // its own slot below.) + { + let mut scans = state.active_scans.lock().await; + *scans = scans.saturating_sub(1); + } + // Emit pass 1 completed event let _ = app.emit("agent:scan-completed", serde_json::json!({ "repositoryId": repository_id, @@ -80,6 +112,12 @@ pub async fn engine_scan_now( // Mark this repo as deep-scanning so engine_start_task can wait engine_state_clone.deep_scanning_repos.lock().await.insert(repo_id_clone.clone()); + // Count this scan against the concurrency limit + { + let mut scans = engine_state_clone.active_scans.lock().await; + *scans += 1; + } + let _ = app_clone.emit("agent:scan-deep-started", serde_json::json!({ "repositoryId": repo_id_clone, })); @@ -102,6 +140,8 @@ pub async fn engine_scan_now( "error": format!("Failed to get app data dir: {e}"), })); engine_state_clone.deep_scanning_repos.lock().await.remove(&repo_id_clone); + let mut scans = engine_state_clone.active_scans.lock().await; + *scans = scans.saturating_sub(1); return; } }; @@ -144,6 +184,10 @@ pub async fn engine_scan_now( // Clear deep-scanning flag so waiting tasks can proceed engine_state_clone.deep_scanning_repos.lock().await.remove(&repo_id_clone); + { + let mut scans = engine_state_clone.active_scans.lock().await; + *scans = scans.saturating_sub(1); + } println!("[engine] deep scan flag cleared for repository {repo_id_clone}"); }); } else { @@ -209,44 +253,69 @@ pub async fn engine_start_task( } // Check budget before starting work (respects per-project ceiling override) + // Accounts for tokens already reserved by other in-flight tasks. { let app_data_dir = app .path() .app_data_dir() .map_err(|e| format!("Failed to get app data dir: {e}"))?; let mut config = db::read_budget_config(&app_data_dir); - // Apply per-project ceiling override (may be lower than global) let effective_ceiling = db::read_effective_ceiling_percent(&app_data_dir, &repository_id); if effective_ceiling < config.max_usage_percent { config.max_usage_percent = effective_ceiling; } - let status = budget::calculate_budget_status(&config); + let reserved = *state.tokens_reserved.lock().await; + let status = + budget::calculate_budget_status_with_reservation(&config, reserved); if status.budget_exhausted { - println!("[engine_start_task] BLOCKED — budget exhausted (available={}, ceiling={}%)", status.tokens_available_for_sustn, effective_ceiling); + println!("[engine_start_task] BLOCKED — budget exhausted (available={}, reserved={}, ceiling={}%)", + status.tokens_available_for_sustn, reserved, effective_ceiling); return Err("Budget exhausted — cannot start task".to_string()); } } - // Check if a task is already running + // Check concurrency limit (includes in-flight scans) { - let current = state.current_task.lock().await; - if current.is_some() { - println!("[engine_start_task] BLOCKED — another task already in progress"); - return Err("Another task is already in progress".to_string()); + let tasks = state.running_tasks.lock().await; + let scans = *state.active_scans.lock().await; + let limit = *state.concurrency_limit.read().await; + let in_flight = tasks.len() + scans; + if in_flight >= limit { + println!( + "[engine_start_task] BLOCKED — at concurrency limit ({}/{}; {} tasks + {} scans)", + in_flight, limit, tasks.len(), scans + ); + return Err(format!( + "Concurrency limit reached ({}/{} tasks running)", + in_flight, limit + )); + } + if tasks.contains_key(&task_id) { + return Err("This task is already running".to_string()); } } - // Set current task + // Register task as running { - let mut current = state.current_task.lock().await; - *current = Some(CurrentTask { - task_id: task_id.clone(), - repository_id: repository_id.clone(), - phase: TaskPhase::Implementing, - started_at: chrono::Local::now().to_rfc3339(), - }); + let mut tasks = state.running_tasks.lock().await; + tasks.insert( + task_id.clone(), + CurrentTask { + task_id: task_id.clone(), + repository_id: repository_id.clone(), + phase: TaskPhase::Implementing, + started_at: chrono::Local::now().to_rfc3339(), + }, + ); } - println!("[engine_start_task] current_task set — emitting agent:task-started"); + + // Reserve estimated tokens to prevent over-commit across parallel tasks + let reserved_tokens = budget::estimated_task_tokens(None); + { + let mut reserved = state.tokens_reserved.lock().await; + *reserved += reserved_tokens; + } + println!("[engine_start_task] task registered — reserved {reserved_tokens} tokens"); // Emit task-started event let _ = app.emit("agent:task-started", serde_json::json!({ @@ -263,10 +332,31 @@ pub async fn engine_start_task( db::read_project_preferences(&app_data_dir, &repository_id) }; - // Execute the task + // Create worktree for task isolation + let _ = engine::worktree::ensure_gitignore_entry(&repo_path); + let worktree_path = engine::worktree::create_worktree( + &repo_path, + &task_id, + &branch_name, + &base_branch, + ) + .map_err(|e| { + // Remove task from running_tasks and release reservation on failure + let state_clone = state.inner().clone(); + let tid = task_id.clone(); + tokio::spawn(async move { + state_clone.running_tasks.lock().await.remove(&tid); + let mut reserved = state_clone.tokens_reserved.lock().await; + *reserved = (*reserved - reserved_tokens).max(0); + }); + e + })?; + println!("[engine_start_task] worktree created at {worktree_path}"); + + // Execute the task in the worktree println!("[engine_start_task] calling worker::execute_task — max_retries=4"); let result = worker::execute_task( - &repo_path, + &worktree_path, &task_id, &task_title, &task_description, @@ -277,6 +367,7 @@ pub async fn engine_start_task( user_messages, resume_session_id, agent_prefs.as_deref(), + Some(app.clone()), ) .await; @@ -285,10 +376,14 @@ pub async fn engine_start_task( result.success, result.phase_reached, result.branch_name, result.commit_sha, result.error ); - // Clear current task + // Remove task from running_tasks and release reservation { - let mut current = state.current_task.lock().await; - *current = None; + let mut tasks = state.running_tasks.lock().await; + tasks.remove(&task_id); + } + { + let mut reserved = state.tokens_reserved.lock().await; + *reserved = (*reserved - reserved_tokens).max(0); } // Emit completion event @@ -313,17 +408,36 @@ pub async fn engine_start_task( Ok(result) } +/// Update the maximum number of tasks that can run concurrently. +/// Clamped to [1, 10] to prevent pathological values. +#[tauri::command] +pub async fn engine_set_concurrency_limit( + state: State<'_, Arc>, + limit: usize, +) -> Result<(), String> { + let clamped = limit.clamp(1, 10); + *state.concurrency_limit.write().await = clamped; + println!("[engine] concurrency limit updated to {clamped}"); + Ok(()) +} + /// Get the current engine status. #[tauri::command] pub async fn engine_get_status( state: State<'_, Arc>, ) -> Result { let running = *state.running.read().await; - let current_task = state.current_task.lock().await.clone(); + let tasks = state.running_tasks.lock().await; + let running_tasks: Vec = tasks.values().cloned().collect(); + // Keep current_task for backward compatibility — first running task + let current_task = running_tasks.first().cloned(); + let limit = *state.concurrency_limit.read().await; Ok(EngineStatusResponse { running, current_task, + running_tasks, + concurrency_limit: limit, }) } @@ -514,6 +628,8 @@ Output ONLY a JSON array (one entry per task, same order) with no markdown forma Some(&context), None, None, + None, + None, ) .await?; @@ -631,6 +747,7 @@ pub async fn engine_address_review( review_comments: String, pr_description: String, resume_session_id: Option, + pr_diff: Option, ) -> Result { println!( "[engine_address_review] task_id={task_id}, branch={branch_name}, comments_len={}, resume={:?}", @@ -651,7 +768,7 @@ pub async fn engine_address_review( return Err(format!("Environment issue: {}", issue.error)); } - // Budget check + // Budget check with reservation tracking { let app_data_dir = app.path().app_data_dir() .map_err(|e| format!("Failed to get app data dir: {e}"))?; @@ -660,27 +777,48 @@ pub async fn engine_address_review( if effective_ceiling < config.max_usage_percent { config.max_usage_percent = effective_ceiling; } - let status = budget::calculate_budget_status(&config); + let reserved = *state.tokens_reserved.lock().await; + let status = + budget::calculate_budget_status_with_reservation(&config, reserved); if status.budget_exhausted { return Err("Budget exhausted — cannot address review".to_string()); } } { - let current = state.current_task.lock().await; - if current.is_some() { - return Err("Another task is already in progress".to_string()); + let tasks = state.running_tasks.lock().await; + let scans = *state.active_scans.lock().await; + let limit = *state.concurrency_limit.read().await; + let in_flight = tasks.len() + scans; + if in_flight >= limit { + return Err(format!( + "Concurrency limit reached ({}/{} tasks running)", + in_flight, limit + )); + } + if tasks.contains_key(&task_id) { + return Err("This task is already running".to_string()); } } { - let mut current = state.current_task.lock().await; - *current = Some(CurrentTask { - task_id: task_id.clone(), - repository_id: repository_id.clone(), - phase: TaskPhase::Implementing, - started_at: chrono::Local::now().to_rfc3339(), - }); + let mut tasks = state.running_tasks.lock().await; + tasks.insert( + task_id.clone(), + CurrentTask { + task_id: task_id.clone(), + repository_id: repository_id.clone(), + phase: TaskPhase::Implementing, + started_at: chrono::Local::now().to_rfc3339(), + }, + ); + } + + // Reserve estimated tokens for this review cycle + let reserved_tokens = budget::estimated_task_tokens(Some("low")); + { + let mut reserved = state.tokens_reserved.lock().await; + *reserved += reserved_tokens; } let _ = app.emit("agent:task-started", serde_json::json!({ @@ -699,37 +837,58 @@ pub async fn engine_address_review( _ => String::new(), }; + // For imported PRs on the first cycle (no session), include the full diff + // so Claude understands the PR before addressing comments. + let pr_context_section = match (&resume_session_id, &pr_diff) { + (None, Some(diff)) => format!( + r#" + +## Full PR Diff (you are taking over this PR — study it carefully) +```diff +{diff} +``` +"# + ), + _ => String::new(), + }; + let prompt = format!( r#"IMPORTANT: You are running as an automated background agent in non-interactive mode. Commit your changes directly — do NOT ask for permission. -A human reviewer has left comments on the PR you created. You need to handle EVERY comment — either by making code changes or by drafting a reply. +A human reviewer has left feedback on a PR. You need to handle EVERY item — either by making code changes or by drafting a reply. ## PR Description {pr_description} +{pr_context_section} +## Review Items +Each item below has a COMMENT_ID and a KIND tag that you MUST echo back in your response. -## Review Comments -Each comment below has a COMMENT_ID number that you MUST include in your response. +KIND values and what they mean: +- `inline` — a review comment anchored to a specific diff line. Usually narrow and code-specific. +- `issue` — a general PR comment not tied to a line. May be a question, a request, or plain chat. Not all of them need a code change — sometimes a plain reply is correct. +- `review_summary` — the free-text body a reviewer wrote when submitting a review. Often contains overall asks (e.g. "please split this into two PRs", "looks good but rename X") that complement the per-line comments in the same review. Treat these as first-class feedback. {review_comments} {prefs_section} ## Instructions -For EACH review comment above: +For EACH item above: -1. **If it requires code changes** (bug fix, refactor, improvement, the reviewer is questioning an approach and they're right): make the changes, commit with trailer SUSTN-Task: {task_id}, and draft a reply explaining what you changed. +1. **If it requires code changes** (bug fix, refactor, improvement, a legitimate concern about the approach): make the changes, commit with trailer SUSTN-Task: {task_id}, and draft a reply explaining what you changed. 2. **If it's a question about your reasoning** (why did you do X?): explain your reasoning clearly — you have context from when you wrote this code. -3. **If it's praise or acknowledgment** (looks good, nice, etc.): draft a brief thanks. +3. **If it's conversational** (praise, a check-in, a "what do you think about Y?"): draft a brief, appropriate reply. No code change needed. -CRITICAL: You MUST return a reply for EVERY comment. Use the exact COMMENT_ID number from each comment header above. +CRITICAL: You MUST return a reply for EVERY item. Use the exact COMMENT_ID number and KIND value from the header above each item. After making any code changes and committing, output ONLY this JSON (no markdown): {{ "replies": [ {{ "comment_id": 1234567890, - "reply": "Your response to this specific comment", + "kind": "inline", + "reply": "Your response to this specific item", "made_code_changes": true }} ], @@ -737,30 +896,45 @@ After making any code changes and committing, output ONLY this JSON (no markdown "files_modified": ["list", "of", "files"] }} -The comment_id MUST be the numeric ID from the [COMMENT_ID: ] tag in each comment above. Do NOT use null."# +The comment_id MUST be the numeric ID from the [COMMENT_ID: ] tag. The kind MUST be one of `inline`, `issue`, or `review_summary` matching the [KIND: ...] tag. Do NOT use null for either field."# ); - // Ensure we're on the right branch - if engine::git::branch_exists(&repo_path, &branch_name) { - engine::git::checkout_branch(&repo_path, &branch_name); - } else { - engine::git::create_branch_from(&repo_path, &branch_name, &base_branch); - } + // Create/reuse worktree for task isolation + let _ = engine::worktree::ensure_gitignore_entry(&repo_path); + let worktree_path = engine::worktree::create_worktree( + &repo_path, + &task_id, + &branch_name, + &base_branch, + ) + .map_err(|e| { + let state_clone = state.inner().clone(); + let tid = task_id.clone(); + tokio::spawn(async move { + state_clone.running_tasks.lock().await.remove(&tid); + let mut reserved = state_clone.tokens_reserved.lock().await; + *reserved = (*reserved - reserved_tokens).max(0); + }); + e + })?; + println!("[engine_address_review] using worktree at {worktree_path}"); // Call Claude CLI directly with our exact prompt (not through worker, // which overrides the prompt with its own resume template) let cli_result = engine::invoke_claude_cli( - &repo_path, + &worktree_path, &prompt, 1800, // 30 min timeout None, None, resume_session_id.as_deref(), + Some(app.clone()), + Some(&task_id), ) .await; // Get commit SHA after Claude ran - let sha_result = engine::git::latest_commit_sha(&repo_path); + let sha_result = engine::git::latest_commit_sha(&worktree_path); let commit_sha = if sha_result.success { Some(sha_result.output) } else { None }; let session_id = cli_result.as_ref().ok().and_then(|r| r.session_id.clone()); @@ -795,8 +969,12 @@ The comment_id MUST be the numeric ID from the [COMMENT_ID: ] tag in eac }; { - let mut current = state.current_task.lock().await; - *current = None; + let mut tasks = state.running_tasks.lock().await; + tasks.remove(&task_id); + } + { + let mut reserved = state.tokens_reserved.lock().await; + *reserved = (*reserved - reserved_tokens).max(0); } if success { @@ -817,9 +995,60 @@ The comment_id MUST be the numeric ID from the [COMMENT_ID: ] tag in eac Ok(result) } +/// Remove a task's worktree (cleanup after completion/dismissal). +#[tauri::command] +pub async fn engine_cleanup_worktree( + repo_path: String, + task_id: String, +) -> Result<(), String> { + engine::worktree::remove_worktree(&repo_path, &task_id) +} + +/// Clone a repository (non-blocking, no credential prompts). +#[tauri::command] +pub async fn engine_clone_repo( + url: String, + destination: String, +) -> Result { + // Run on blocking thread since clone can take a while + tokio::task::spawn_blocking(move || { + Ok(engine::git::clone_repo(&url, &destination)) + }) + .await + .map_err(|e| format!("Clone task failed: {}", e))? +} + +/// Fetch a specific branch from origin (with optional PR number for fork fallback). +#[tauri::command] +pub async fn engine_fetch_branch( + repo_path: String, + branch_name: String, + pr_number: Option, +) -> Result { + Ok(engine::git::fetch_branch_with_pr(&repo_path, &branch_name, pr_number)) +} + +/// Get the remote URL for origin. +#[tauri::command] +pub async fn engine_get_remote_url( + repo_path: String, +) -> Result { + let result = engine::git::get_remote_url(&repo_path); + if result.success { + Ok(result.output) + } else { + Err(result.error.unwrap_or_else(|| "Failed to get remote URL".to_string())) + } +} + #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct EngineStatusResponse { pub running: bool, + /// First running task, kept for backward compatibility. pub current_task: Option, + /// All currently running tasks. + pub running_tasks: Vec, + /// Maximum number of tasks that can run concurrently. + pub concurrency_limit: usize, } diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index bbdaf8a..4e1d710 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -90,6 +90,7 @@ pub fn run() { engine_commands::engine_scan_now, engine_commands::engine_start_task, engine_commands::engine_get_status, + engine_commands::engine_set_concurrency_limit, engine_commands::engine_check_schedule, engine_commands::engine_push_branch, engine_commands::engine_list_branches, @@ -98,6 +99,10 @@ pub fn run() { engine_commands::engine_create_pr, engine_commands::engine_augment_tasks, engine_commands::engine_address_review, + engine_commands::engine_cleanup_worktree, + engine_commands::engine_clone_repo, + engine_commands::engine_fetch_branch, + engine_commands::engine_get_remote_url, engine_commands::run_gh_api, engine_commands::run_gh_api_post, engine_commands::run_terminal_command, diff --git a/src-tauri/src/migrations.rs b/src-tauri/src/migrations.rs index fb029bb..987f5b7 100644 --- a/src-tauri/src/migrations.rs +++ b/src-tauri/src/migrations.rs @@ -364,5 +364,69 @@ pub fn migrations() -> Vec { "#, kind: MigrationKind::Up, }, + // Migration 18: worktree path for task isolation + Migration { + version: 18, + description: "add worktree_path to tasks", + sql: r#" + ALTER TABLE tasks ADD COLUMN worktree_path TEXT; + "#, + kind: MigrationKind::Up, + }, + // Migration 19: persisted agent streaming events per task + Migration { + version: 19, + description: "add task_agent_events table for streamed agent output", + sql: r#" + CREATE TABLE IF NOT EXISTS task_agent_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, + event_type TEXT, + blocks_json TEXT NOT NULL, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_task_agent_events_task + ON task_agent_events(task_id); + "#, + kind: MigrationKind::Up, + }, + // Migration 20: seed default concurrency limit + Migration { + version: 20, + description: "seed concurrency_limit global setting", + sql: r#" + INSERT OR IGNORE INTO global_settings (key, value) VALUES + ('concurrency_limit', '5'); + "#, + kind: MigrationKind::Up, + }, + // Migration 21: per-repo scan_enabled flag (default true for existing repos, + // imported repos will set this to false) + Migration { + version: 21, + description: "add scan_enabled to agent_config", + sql: r#" + ALTER TABLE agent_config ADD COLUMN scan_enabled INTEGER NOT NULL DEFAULT 1; + "#, + kind: MigrationKind::Up, + }, + // Migration 22: track comment source kind so issue-level and + // review-summary comments live alongside inline review comments + // without colliding on github_comment_id (different server-side + // ID namespaces can reuse the same integer). + Migration { + version: 22, + description: "add kind to pr_comments and seed rollout cutoff", + sql: r#" + ALTER TABLE pr_comments ADD COLUMN kind TEXT NOT NULL DEFAULT 'inline'; + DROP INDEX IF EXISTS idx_pr_comments_github_id; + CREATE UNIQUE INDEX IF NOT EXISTS idx_pr_comments_kind_github_id + ON pr_comments(kind, github_comment_id); + INSERT OR IGNORE INTO global_settings (key, value) VALUES + ('pr_comments_rollout_cutoff', strftime('%Y-%m-%dT%H:%M:%fZ', 'now')); + "#, + kind: MigrationKind::Up, + }, ] } diff --git a/src/core/api/useEngine.ts b/src/core/api/useEngine.ts index 836149c..620e291 100644 --- a/src/core/api/useEngine.ts +++ b/src/core/api/useEngine.ts @@ -1,4 +1,4 @@ -import { useEffect, useRef } from "react"; +import { useEffect } from "react"; import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; import { invoke } from "@tauri-apps/api/core"; import { listen } from "@tauri-apps/api/event"; @@ -98,6 +98,7 @@ export function useUpdateAgentConfig() { scheduleWindowEnd?: string; scanIntervalMinutes?: number; priority?: number; + scanEnabled?: boolean; }) => updateAgentConfig(repositoryId, fields), onSuccess: (config) => { metrics.track("settings_changed", { setting: "agent_config" }); @@ -209,15 +210,17 @@ export function useDeepScanListener(repositoryId: string | undefined) { queryKey: ["tasks", repositoryId], }); - // If a task is actively running, re-invalidate its individual - // query so the TaskDetailView stays in sync after the list refetch. + // Re-invalidate any running task's individual query so the + // TaskDetailView stays in sync after the list refetch. const status = queryClient.getQueryData([ "engine-status", ]); - if (status?.currentTask?.repositoryId === repositoryId) { - void queryClient.invalidateQueries({ - queryKey: ["task", status.currentTask.taskId], - }); + for (const task of status?.runningTasks ?? []) { + if (task.repositoryId === repositoryId) { + void queryClient.invalidateQueries({ + queryKey: ["task", task.taskId], + }); + } } // Notify about new tasks found @@ -622,7 +625,10 @@ export function useStartTask() { handleTaskResult(result, variables, queryClient), onError: (error, variables) => { const msg = error instanceof Error ? error.message : String(error); - if (msg.includes("Another task is already in progress")) { + if ( + msg.includes("Concurrency limit reached") || + msg.includes("Another task is already in progress") + ) { // Race condition: engine busy but stale status didn't show it. // Queue the task instead of failing it. useQueueStore.getState().enqueue(variables); @@ -662,11 +668,25 @@ export function useStartTask() { */ export function useQueueProcessor() { const queryClient = useQueryClient(); - const processingRef = useRef(false); useEffect(() => { async function processNext() { - if (processingRef.current) return; + // Check concurrency capacity — can we run another task? + try { + const engineStatus = + await invoke("engine_get_status"); + const running = engineStatus.runningTasks?.length ?? 0; + const limit = engineStatus.concurrencyLimit ?? 1; + if (running >= limit) { + console.log( + `[queue] at concurrency limit (${running}/${limit}) — waiting`, + ); + return; + } + } catch (e) { + console.error("[queue] status check failed:", e); + return; + } // Check budget before dequeuing try { @@ -682,7 +702,6 @@ export function useQueueProcessor() { const next = useQueueStore.getState().dequeue(); if (!next) return; - processingRef.current = true; console.log( "[queue] starting next task:", next.taskId, @@ -716,7 +735,10 @@ export function useQueueProcessor() { } catch (error) { const msg = error instanceof Error ? error.message : String(error); - if (msg.includes("Another task is already in progress")) { + if ( + msg.includes("Concurrency limit reached") || + msg.includes("Another task is already in progress") + ) { // Engine still busy — put task back and wait for completion event useQueueStore.getState().enqueue(next); await dbUpdateTask(next.taskId, { @@ -732,7 +754,6 @@ export function useQueueProcessor() { await handleTaskError(error, next, queryClient); } } finally { - processingRef.current = false; if (!requeued) { // Chain: check for more queued tasks void processNext(); @@ -754,17 +775,10 @@ export function useQueueProcessor() { }), ); - // Also process when a task is enqueued while engine is idle + // Also process when a task is enqueued and a slot is available const unsub = useQueueStore.subscribe((state, prevState) => { if (state.queue.length > prevState.queue.length) { - // Only start if the engine is actually idle - void invoke("engine_get_status") - .then((status) => { - if (!status.currentTask) { - void processNext(); - } - }) - .catch(() => {}); + void processNext(); } }); @@ -1009,6 +1023,20 @@ export function useStartupRecovery() { }); } }); + + // Sync persisted concurrency limit to the Rust engine state + void getGlobalSettings().then((settings) => { + if (typeof settings.concurrencyLimit === "number") { + void invoke("engine_set_concurrency_limit", { + limit: settings.concurrencyLimit, + }).catch((e) => { + console.warn( + "[startup] failed to sync concurrency limit:", + e, + ); + }); + } + }); }, [queryClient]); } @@ -1043,6 +1071,9 @@ async function runStartupScans(queryClient: ReturnType) { for (const repo of repos) { const config = await getAgentConfig(repo.id); + // Skip if scan-on-discovery is disabled for this repo + if (config.scanEnabled === false) continue; + // Only auto-scan repos that have NEVER been scanned if (config.lastScanAt) continue; diff --git a/src/core/api/useImportPr.ts b/src/core/api/useImportPr.ts new file mode 100644 index 0000000..493fd07 --- /dev/null +++ b/src/core/api/useImportPr.ts @@ -0,0 +1,60 @@ +import { useMutation, useQueryClient } from "@tanstack/react-query"; +import { importPr } from "@core/services/pr-import"; +import { parseOwnerRepo } from "@core/services/github"; +import { useAppStore } from "@core/store/app-store"; +import { + prImportProgressToast, + prImportSuccessToast, + prImportErrorToast, +} from "@ui/lib/toast"; + +export function useImportPr() { + const queryClient = useQueryClient(); + + return useMutation({ + mutationFn: (prUrl: string) => { + const parsed = parseOwnerRepo(prUrl); + const toastId = parsed + ? `pr-import-${parsed.owner}-${parsed.repo}-${parsed.number}` + : `pr-import-${Date.now()}`; + const label = parsed ? `PR #${parsed.number}` : "PR"; + + prImportProgressToast(toastId, `Importing ${label}...`); + + return importPr(prUrl, { + onProgress: (step) => prImportProgressToast(toastId, step), + onRepoReady: (repositoryId) => { + // Refresh sidebar immediately so the project appears + void queryClient.invalidateQueries({ + queryKey: ["repositories"], + }); + useAppStore.getState().setSelectedRepository(repositoryId); + }, + }).then( + (task) => { + prImportSuccessToast(toastId, task.prNumber ?? 0); + return task; + }, + (err) => { + prImportErrorToast( + toastId, + err instanceof Error ? err.message : "Import failed", + ); + throw err; + }, + ); + }, + onSuccess: (task) => { + void queryClient.invalidateQueries({ + queryKey: ["tasks", task.repositoryId], + }); + void queryClient.invalidateQueries({ + queryKey: ["repositories"], + }); + + // Select the imported task + useAppStore.getState().setSelectedRepository(task.repositoryId); + useAppStore.getState().setSelectedTask(task.id); + }, + }); +} diff --git a/src/core/api/useScheduler.ts b/src/core/api/useScheduler.ts index 06f20f4..0d21d68 100644 --- a/src/core/api/useScheduler.ts +++ b/src/core/api/useScheduler.ts @@ -87,10 +87,14 @@ async function runSchedulerTick( // 2. Check if work is allowed right now const workAllowed = shouldWorkNow(settings); - // 3. Check if engine is busy + // 3. Check if engine is at concurrency capacity const status = await invoke("engine_get_status"); - if (status.currentTask) { - console.log("[scheduler] engine busy — skipping tick"); + const running = status.runningTasks?.length ?? 0; + const limit = status.concurrencyLimit ?? 1; + if (running >= limit) { + console.log( + `[scheduler] at concurrency limit (${running}/${limit}) — skipping tick`, + ); return; } @@ -111,7 +115,10 @@ async function runSchedulerTick( if (!agentConfig.enabled) continue; // ── Auto-scan ────────────────────────────────────── - if (isScanDue(agentConfig.lastScanAt, settings.scanFrequency)) { + if ( + agentConfig.scanEnabled !== false && + isScanDue(agentConfig.lastScanAt, settings.scanFrequency) + ) { console.log(`[scheduler] scan due for ${repo.name} — triggering`); try { diff --git a/src/core/api/useSettings.ts b/src/core/api/useSettings.ts index d4289f5..cd56447 100644 --- a/src/core/api/useSettings.ts +++ b/src/core/api/useSettings.ts @@ -1,4 +1,5 @@ import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { invoke } from "@tauri-apps/api/core"; import { getGlobalSettings, updateGlobalSetting, @@ -24,19 +25,37 @@ export function useUpdateGlobalSetting() { const queryClient = useQueryClient(); return useMutation({ - mutationFn: ({ + mutationFn: async ({ key, value, }: { key: keyof GlobalSettings; value: unknown; - }) => updateGlobalSetting(key, value), + }) => { + await updateGlobalSetting(key, value); + // Propagate runtime-sensitive settings to the Rust engine + if (key === "concurrencyLimit" && typeof value === "number") { + try { + await invoke("engine_set_concurrency_limit", { + limit: value, + }); + } catch (e) { + console.warn( + "[useSettings] failed to sync concurrency limit:", + e, + ); + } + } + }, onSuccess: (_data, variables) => { metrics.track("settings_changed", { setting: variables.key }); savedToast(); void queryClient.invalidateQueries({ queryKey: ["global-settings"], }); + void queryClient.invalidateQueries({ + queryKey: ["engine-status"], + }); }, }); } diff --git a/src/core/api/useTasks.ts b/src/core/api/useTasks.ts index 9ab14ff..186c128 100644 --- a/src/core/api/useTasks.ts +++ b/src/core/api/useTasks.ts @@ -1,4 +1,5 @@ import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { invoke } from "@tauri-apps/api/core"; import { listTasks, getTask, @@ -11,6 +12,7 @@ import { createMessage as dbCreateMessage, listMessages as dbListMessages, } from "@core/db/tasks"; +import { listRepositories } from "@core/db/repositories"; import type { Task, TaskCategory, @@ -20,6 +22,23 @@ import type { } from "@core/types/task"; import { metrics } from "@core/services/metrics"; +async function cleanupWorktree( + repositoryId: string, + taskId: string, +): Promise { + try { + const repos = await listRepositories(); + const repo = repos.find((r) => r.id === repositoryId); + if (!repo) return; + await invoke("engine_cleanup_worktree", { + repoPath: repo.path, + taskId, + }); + } catch (e) { + console.warn("[useTasks] worktree cleanup failed:", e); + } +} + export function useTasks( repositoryId: string | undefined, baseBranch?: string, @@ -94,6 +113,11 @@ export function useUpdateTask() { void queryClient.invalidateQueries({ queryKey: ["task-events", task.id], }); + + // Clean up worktree when task reaches a terminal state + if (task.state === "done" || task.state === "dismissed") { + void cleanupWorktree(task.repositoryId, task.id); + } }, }); } diff --git a/src/core/db/agent-config.ts b/src/core/db/agent-config.ts index 86d99d2..595a3b8 100644 --- a/src/core/db/agent-config.ts +++ b/src/core/db/agent-config.ts @@ -17,6 +17,7 @@ interface AgentConfigRow { last_scan_at: string | null; last_work_at: string | null; priority: number; + scan_enabled: number; } interface BudgetConfigRow { @@ -43,6 +44,7 @@ function rowToAgentConfig(row: AgentConfigRow): AgentConfig { lastScanAt: row.last_scan_at ?? undefined, lastWorkAt: row.last_work_at ?? undefined, priority: row.priority, + scanEnabled: (row.scan_enabled ?? 1) === 1, }; } @@ -83,6 +85,7 @@ export async function updateAgentConfig( | "scheduleWindowEnd" | "scanIntervalMinutes" | "priority" + | "scanEnabled" > >, ): Promise { @@ -122,6 +125,10 @@ export async function updateAgentConfig( setClauses.push(`priority = $${paramIndex++}`); values.push(fields.priority); } + if (fields.scanEnabled !== undefined) { + setClauses.push(`scan_enabled = $${paramIndex++}`); + values.push(fields.scanEnabled ? 1 : 0); + } if (setClauses.length > 0) { await db.execute( diff --git a/src/core/db/pr-lifecycle.ts b/src/core/db/pr-lifecycle.ts index 6ee835d..c6280c4 100644 --- a/src/core/db/pr-lifecycle.ts +++ b/src/core/db/pr-lifecycle.ts @@ -6,7 +6,7 @@ import Database from "@tauri-apps/plugin-sql"; import { invoke } from "@tauri-apps/api/core"; import { config } from "@core/config"; -import type { PrReview, PrComment } from "@core/types/task"; +import type { PrReview, PrComment, PrCommentKind } from "@core/types/task"; async function getDb() { return await Database.load(config.dbUrl); @@ -114,6 +114,7 @@ interface PrCommentRow { id: string; task_id: string; github_comment_id: number; + kind: string; in_reply_to_id: number | null; reviewer: string; body: string; @@ -133,6 +134,7 @@ function rowToComment(row: PrCommentRow): PrComment { id: row.id, taskId: row.task_id, githubCommentId: row.github_comment_id, + kind: (row.kind as PrCommentKind) ?? "inline", inReplyToId: row.in_reply_to_id ?? undefined, reviewer: row.reviewer, body: row.body, @@ -153,6 +155,7 @@ export async function upsertComment( taskId: string, comment: { githubCommentId: number; + kind: PrCommentKind; inReplyToId?: number; reviewer: string; body: string; @@ -160,39 +163,47 @@ export async function upsertComment( line?: number; side?: string; commitId?: string; + /** + * When the source comment predates the non-inline-comment rollout, + * the service passes a commit-sha-like sentinel so the row is + * inserted as already-addressed and never re-sent to Claude. + */ + preAddressedSentinel?: string; }, ): Promise { const db = await getDb(); - // Check if already exists — update body if so + // Check if already exists — match on (kind, github_comment_id) because + // review / pull-comment / issue-comment IDs share an integer space but + // come from different server-side tables and can collide. const existing = await db.select( - "SELECT * FROM pr_comments WHERE github_comment_id = $1", - [comment.githubCommentId], + "SELECT * FROM pr_comments WHERE kind = $1 AND github_comment_id = $2", + [comment.kind, comment.githubCommentId], ); if (existing.length > 0) { - // Update body in case it changed if (existing[0].body !== comment.body) { await db.execute( - "UPDATE pr_comments SET body = $1, updated_at = CURRENT_TIMESTAMP WHERE github_comment_id = $2", - [comment.body, comment.githubCommentId], + "UPDATE pr_comments SET body = $1, updated_at = CURRENT_TIMESTAMP WHERE kind = $2 AND github_comment_id = $3", + [comment.body, comment.kind, comment.githubCommentId], ); } const rows = await db.select( - "SELECT * FROM pr_comments WHERE github_comment_id = $1", - [comment.githubCommentId], + "SELECT * FROM pr_comments WHERE kind = $1 AND github_comment_id = $2", + [comment.kind, comment.githubCommentId], ); return rowToComment(rows[0]); } const id = await invoke("generate_task_id"); await db.execute( - `INSERT INTO pr_comments (id, task_id, github_comment_id, in_reply_to_id, reviewer, body, path, line, side, commit_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + `INSERT INTO pr_comments (id, task_id, github_comment_id, kind, in_reply_to_id, reviewer, body, path, line, side, commit_id, addressed_in_commit, classification) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, [ id, taskId, comment.githubCommentId, + comment.kind, comment.inReplyToId ?? null, comment.reviewer, comment.body, @@ -200,6 +211,8 @@ export async function upsertComment( comment.line ?? null, comment.side ?? null, comment.commitId ?? null, + comment.preAddressedSentinel ?? null, + comment.preAddressedSentinel ? "resolved" : null, ], ); @@ -245,27 +258,44 @@ export async function updateCommentClassification( } export async function markCommentAddressed( + kind: PrCommentKind, githubCommentId: number, commitSha: string, ): Promise { const db = await getDb(); await db.execute( - "UPDATE pr_comments SET addressed_in_commit = $1, classification = 'resolved', updated_at = CURRENT_TIMESTAMP WHERE github_comment_id = $2", - [commitSha, githubCommentId], + "UPDATE pr_comments SET addressed_in_commit = $1, classification = 'resolved', updated_at = CURRENT_TIMESTAMP WHERE kind = $2 AND github_comment_id = $3", + [commitSha, kind, githubCommentId], ); } export async function setCommentReply( + kind: PrCommentKind, githubCommentId: number, reply: string, ): Promise { const db = await getDb(); await db.execute( - "UPDATE pr_comments SET our_reply = $1, updated_at = CURRENT_TIMESTAMP WHERE github_comment_id = $2", - [reply, githubCommentId], + "UPDATE pr_comments SET our_reply = $1, updated_at = CURRENT_TIMESTAMP WHERE kind = $2 AND github_comment_id = $3", + [reply, kind, githubCommentId], ); } +/** + * Read the rollout cutoff timestamp stamped at migration 22 install time. + * Comments/reviews with source timestamp <= cutoff are treated as + * pre-existing history and not fed back to Claude. + */ +export async function getPrCommentsRolloutCutoff(): Promise< + string | undefined +> { + const db = await getDb(); + const rows = await db.select<{ value: string }[]>( + "SELECT value FROM global_settings WHERE key = 'pr_comments_rollout_cutoff'", + ); + return rows[0]?.value; +} + /** Get all tasks that have an active PR lifecycle (for polling) */ export async function getTasksWithActivePr(): Promise< { diff --git a/src/core/db/settings.ts b/src/core/db/settings.ts index ebb2958..3cf79a5 100644 --- a/src/core/db/settings.ts +++ b/src/core/db/settings.ts @@ -53,6 +53,7 @@ const KEY_MAP: Record = { linear_enabled: "linearEnabled", pr_lifecycle_enabled: "prLifecycleEnabled", max_review_cycles: "maxReviewCycles", + concurrency_limit: "concurrencyLimit", }; const REVERSE_KEY_MAP: Record = Object.fromEntries( @@ -75,6 +76,7 @@ function parseValue(camelKey: string, raw: string): unknown { return raw ? (raw.split(",") as ScheduleDay[]) : []; if (camelKey === "budgetCeilingPercent") return parseInt(raw, 10); if (camelKey === "maxReviewCycles") return parseInt(raw, 10); + if (camelKey === "concurrencyLimit") return parseInt(raw, 10); return raw; } @@ -109,6 +111,7 @@ const DEFAULTS: GlobalSettings = { linearEnabled: false, prLifecycleEnabled: true, maxReviewCycles: 5, + concurrencyLimit: 5, }; export async function getGlobalSettings(): Promise { diff --git a/src/core/db/task-agent-events.ts b/src/core/db/task-agent-events.ts new file mode 100644 index 0000000..e458b41 --- /dev/null +++ b/src/core/db/task-agent-events.ts @@ -0,0 +1,73 @@ +/** + * Persisted agent streaming events. + * + * Stores the output emitted from Claude CLI during task execution so users + * can view it after streaming ends or on app reload. + */ + +import Database from "@tauri-apps/plugin-sql"; +import { config } from "@core/config"; +import type { TaskOutputEvent, ContentBlock } from "@core/types/agent"; + +async function getDb() { + return await Database.load(config.dbUrl); +} + +interface EventRow { + id: number; + task_id: string; + event_type: string | null; + blocks_json: string; + timestamp: string; +} + +function rowToEvent(row: EventRow): TaskOutputEvent { + let blocks: ContentBlock[] = []; + try { + blocks = JSON.parse(row.blocks_json) as ContentBlock[]; + } catch { + blocks = []; + } + return { + taskId: row.task_id, + eventType: row.event_type ?? undefined, + blocks, + raw: "", + timestamp: row.timestamp, + }; +} + +export async function saveAgentEvent(event: TaskOutputEvent): Promise { + const db = await getDb(); + await db.execute( + `INSERT INTO task_agent_events (task_id, event_type, blocks_json, timestamp) + VALUES ($1, $2, $3, $4)`, + [ + event.taskId, + event.eventType ?? null, + JSON.stringify(event.blocks), + event.timestamp, + ], + ); +} + +export async function listAgentEvents( + taskId: string, +): Promise { + const db = await getDb(); + const rows = await db.select( + `SELECT id, task_id, event_type, blocks_json, timestamp + FROM task_agent_events + WHERE task_id = $1 + ORDER BY id ASC`, + [taskId], + ); + return rows.map(rowToEvent); +} + +export async function clearAgentEvents(taskId: string): Promise { + const db = await getDb(); + await db.execute(`DELETE FROM task_agent_events WHERE task_id = $1`, [ + taskId, + ]); +} diff --git a/src/core/db/tasks.ts b/src/core/db/tasks.ts index b6ea4b5..2803f50 100644 --- a/src/core/db/tasks.ts +++ b/src/core/db/tasks.ts @@ -30,6 +30,7 @@ interface TaskRow { files_involved: string | null; base_branch: string | null; branch_name: string | null; + worktree_path: string | null; commit_sha: string | null; session_id: string | null; linear_issue_id: string | null; @@ -109,6 +110,7 @@ function rowToTask(row: TaskRow): Task { filesInvolved: parseFilesInvolved(row.files_involved), baseBranch: row.base_branch ?? undefined, branchName: row.branch_name ?? undefined, + worktreePath: row.worktree_path ?? undefined, commitSha: row.commit_sha ?? undefined, sessionId: row.session_id ?? undefined, linearIssueId: row.linear_issue_id ?? undefined, @@ -268,6 +270,7 @@ export async function updateTask( | "category" | "baseBranch" | "branchName" + | "worktreePath" | "commitSha" | "sessionId" | "lastError" @@ -329,6 +332,10 @@ export async function updateTask( setClauses.push(`branch_name = $${paramIndex++}`); values.push(fields.branchName); } + if (fields.worktreePath !== undefined) { + setClauses.push(`worktree_path = $${paramIndex++}`); + values.push(fields.worktreePath); + } if (fields.commitSha !== undefined) { setClauses.push(`commit_sha = $${paramIndex++}`); values.push(fields.commitSha); @@ -582,6 +589,63 @@ export async function createScannedTask( return rowToTask(rows[0]); } +/** + * Create a task from an imported PR. + * The task starts in "review" state with PR fields pre-populated. + */ +export async function createImportedTask( + repositoryId: string, + task: { + title: string; + description: string | undefined; + baseBranch: string; + branchName: string; + prUrl: string; + prNumber: number; + }, + sortOrder: number, +): Promise { + const db = await getDb(); + const id = await invoke("generate_task_id"); + + await db.execute( + `INSERT INTO tasks (id, repository_id, title, description, category, sort_order, source, state, base_branch, branch_name, pr_url, pr_number, pr_state) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, + [ + id, + repositoryId, + task.title, + task.description ?? null, + "general", + sortOrder, + "imported", + "review", + task.baseBranch, + task.branchName, + task.prUrl, + task.prNumber, + "in_review", + ], + ); + + await recordEvent( + db, + id, + "created", + undefined, + undefined, + undefined, + "Imported from PR", + ); + + const rows = await db.select( + "SELECT * FROM tasks WHERE id = $1", + [id], + ); + + return rowToTask(rows[0]); +} + /** * Insert a single Linear-sourced task into the DB. */ diff --git a/src/core/services/github.ts b/src/core/services/github.ts index 366320e..4474fb9 100644 --- a/src/core/services/github.ts +++ b/src/core/services/github.ts @@ -30,6 +30,15 @@ export interface GhPrComment { updated_at: string; } +/** Issue-level comment on a PR (general discussion, not tied to a line). */ +export interface GhIssueComment { + id: number; + user: { login: string }; + body: string; + created_at: string; + updated_at: string; +} + export interface GhPrStatus { state: "open" | "closed" | "merged"; merged: boolean; @@ -78,6 +87,39 @@ async function ghApiPost( return JSON.parse(result.stdout) as T; } +// ── PR Metadata ──────────────────────────────────────────── + +export interface GhPrMetadata { + title: string; + body: string; + headBranch: string; + baseBranch: string; + user: { login: string }; + state: "open" | "closed"; + merged: boolean; +} + +export async function getPrMetadata( + repoPath: string, + owner: string, + repo: string, + prNumber: number, +): Promise { + const raw = await ghApi>( + repoPath, + `repos/${owner}/${repo}/pulls/${prNumber}`, + ); + return { + title: raw.title as string, + body: (raw.body as string) ?? "", + headBranch: (raw.head as { ref: string }).ref, + baseBranch: (raw.base as { ref: string }).ref, + user: { login: (raw.user as { login: string }).login }, + state: raw.state as "open" | "closed", + merged: raw.merged as boolean, + }; +} + // ── PR Status ─────────────────────────────────────────────── export async function getPrStatus( @@ -152,6 +194,57 @@ export async function postPrComment( ); } +// ── Issue-level Comments ──────────────────────────────────── + +/** + * List issue-level comments on a PR (general conversation, not tied to a + * diff line). These come from the issues endpoint because PRs are issues + * on the GitHub data model. + */ +export async function listIssueComments( + repoPath: string, + owner: string, + repo: string, + prNumber: number, +): Promise { + return ghApi( + repoPath, + `repos/${owner}/${repo}/issues/${prNumber}/comments`, + ); +} + +/** + * Marker appended to every bot-authored issue comment so we can identify + * and skip them on the next fetch, without having to resolve the gh + * user's login. More robust than login matching if auth is swapped. + */ +export const SUSTN_MARKER_PREFIX = "`; +} + +export function bodyHasSustnMarker(body: string): boolean { + return body.includes(SUSTN_MARKER_PREFIX); +} + +/** + * Post an issue comment on a PR with a trailing marker identifying it as + * authored by SUSTN for a given task. The marker is invisible in GitHub's + * rendered markdown but lets us dedup on fetch. + */ +export async function postPrCommentWithMarker( + repoPath: string, + owner: string, + repo: string, + prNumber: number, + taskId: string, + body: string, +): Promise<{ id: number }> { + const stamped = `${body}\n\n${sustnMarker(taskId)}`; + return postPrComment(repoPath, owner, repo, prNumber, stamped); +} + // ── Re-request Review ─────────────────────────────────────── export async function requestReview( @@ -190,3 +283,74 @@ export async function getPrDiff( } return result.stdout; } + +// ── Resolved Threads ────────────────────────────────────── + +/** + * Fetch the set of root comment IDs whose review thread has been + * marked as "Resolved" on GitHub. + * + * Uses the GraphQL API because the REST API does not expose + * thread resolution status. + */ +export async function getResolvedThreadCommentIds( + repoPath: string, + owner: string, + repo: string, + prNumber: number, +): Promise> { + const query = ` + query($owner: String!, $repo: String!, $number: Int!) { + repository(owner: $owner, name: $repo) { + pullRequest(number: $number) { + reviewThreads(first: 100) { + nodes { + isResolved + comments(first: 1) { + nodes { databaseId } + } + } + } + } + } + } + `; + + try { + const result = await ghApiPost<{ + data?: { + repository?: { + pullRequest?: { + reviewThreads?: { + nodes?: Array<{ + isResolved: boolean; + comments: { + nodes: Array<{ databaseId: number }>; + }; + }>; + }; + }; + }; + }; + }>(repoPath, "graphql", { + query, + variables: { owner, repo, number: prNumber }, + }); + + const threads = + result.data?.repository?.pullRequest?.reviewThreads?.nodes ?? []; + const resolvedIds = new Set(); + for (const thread of threads) { + if (thread.isResolved) { + const rootComment = thread.comments.nodes[0]; + if (rootComment) { + resolvedIds.add(rootComment.databaseId); + } + } + } + return resolvedIds; + } catch (e) { + console.warn(`[github] failed to fetch resolved threads:`, e); + return new Set(); + } +} diff --git a/src/core/services/pr-import.ts b/src/core/services/pr-import.ts new file mode 100644 index 0000000..138be30 --- /dev/null +++ b/src/core/services/pr-import.ts @@ -0,0 +1,224 @@ +/** + * PR Import Service + * + * Orchestrates importing an external PR into SUSTN: + * parse URL → match/clone repo → fetch metadata → create task → trigger lifecycle + */ + +import { invoke } from "@tauri-apps/api/core"; +import { parseOwnerRepo, getPrMetadata } from "@core/services/github"; +import { listRepositories, addRepository } from "@core/db/repositories"; +import { createImportedTask, listTasks } from "@core/db/tasks"; +import { + getAgentConfig, + updateAgentConfig, + updateLastScanAt, +} from "@core/db/agent-config"; +import type { Task } from "@core/types/task"; + +export type ImportProgress = (step: string) => void; +export type ImportCallbacks = { + onProgress?: ImportProgress; + onRepoReady?: (repositoryId: string) => void; +}; + +/** + * Import a GitHub PR into SUSTN. + * Returns the created task, ready for the PR lifecycle poller to pick up. + */ +export async function importPr( + prUrl: string, + callbacks?: ImportCallbacks, +): Promise { + const progress = callbacks?.onProgress ?? (() => {}); + + // 1. Parse the PR URL + const parsed = parseOwnerRepo(prUrl); + if (!parsed) { + throw new Error( + "Invalid PR URL. Expected: https://github.com/owner/repo/pull/123", + ); + } + const { owner, repo, number: prNumber } = parsed; + console.log(`[pr-import] importing ${owner}/${repo}#${prNumber}`); + + // 2. Find or clone the repository + progress(`Looking for ${owner}/${repo}...`); + const repoPath = await resolveRepository(owner, repo, progress); + const repos = await listRepositories(); + const repository = repos.find((r) => r.path === repoPath); + if (!repository) { + throw new Error(`Repository not found after resolution: ${repoPath}`); + } + + // Signal that the repo is ready (so sidebar can refresh immediately) + callbacks?.onRepoReady?.(repository.id); + + // 3. Fetch PR metadata from GitHub + progress(`Fetching PR #${prNumber} details...`); + const metadata = await getPrMetadata(repoPath, owner, repo, prNumber); + if (metadata.merged) { + throw new Error("This PR has already been merged."); + } + if (metadata.state === "closed") { + throw new Error("This PR is closed."); + } + + console.log( + `[pr-import] PR #${prNumber}: "${metadata.title}" (${metadata.headBranch} → ${metadata.baseBranch})`, + ); + + // 4. Fetch the PR branch locally + progress(`Fetching branch ${metadata.headBranch}...`); + await invoke("engine_fetch_branch", { + repoPath, + branchName: metadata.headBranch, + prNumber, + }); + + // 5. Create the task + progress("Creating task..."); + const existingTasks = await listTasks(repository.id); + const maxSortOrder = + existingTasks.length > 0 + ? Math.max(...existingTasks.map((t) => t.sortOrder)) + : 0; + + const task = await createImportedTask( + repository.id, + { + title: `[PR #${prNumber}] ${metadata.title}`, + description: metadata.body || undefined, + baseBranch: metadata.baseBranch, + branchName: metadata.headBranch, + prUrl, + prNumber, + }, + maxSortOrder + 1, + ); + console.log(`[pr-import] task created: ${task.id}`); + + // Imported repos shouldn't auto-scan for tasks — the user came here to + // manage an existing PR, not to start a scan-driven backlog. Disable + // scanning permanently for this repo; user can re-enable in project settings. + try { + await getAgentConfig(repository.id); + await updateAgentConfig(repository.id, { scanEnabled: false }); + await updateLastScanAt(repository.id); + } catch { + // Non-critical + } + + // Trigger PR lifecycle immediately (don't wait for 2-min poll) + try { + const { processTaskPr } = await import("@core/services/pr-lifecycle"); + const { getGlobalSettings, getProjectOverrides } = + await import("@core/db/settings"); + const settings = await getGlobalSettings(); + const overrides = await getProjectOverrides(repository.id); + const autoReply = + overrides.overridePrAutoReply ?? settings.prLifecycleEnabled; + + // Re-fetch the task so it has all fields populated + const { getTask } = await import("@core/db/tasks"); + const freshTask = await getTask(task.id); + if (freshTask) { + void processTaskPr( + freshTask, + repoPath, + settings.maxReviewCycles ?? 5, + autoReply, + ); + } + } catch (e) { + console.warn(`[pr-import] failed to trigger immediate lifecycle:`, e); + } + + return task; +} + +/** + * Find a local repository matching the GitHub owner/repo, + * or clone it if not found. + */ +async function resolveRepository( + owner: string, + repo: string, + progress: ImportProgress, +): Promise { + const repos = await listRepositories(); + const githubSuffix = `${owner}/${repo}`; + + // Check each repo's remote URL for a match + for (const r of repos) { + try { + const remoteUrl = await invoke("engine_get_remote_url", { + repoPath: r.path, + }); + if ( + remoteUrl.includes(githubSuffix) || + remoteUrl.includes(`${githubSuffix}.git`) + ) { + console.log( + `[pr-import] matched existing repo: ${r.name} (${r.path})`, + ); + return r.path; + } + } catch { + // Skip repos where remote URL can't be read + } + } + + // No match — clone the repo + progress(`Cloning ${owner}/${repo} — this may take a minute...`); + console.log(`[pr-import] no matching repo, cloning ${owner}/${repo}`); + const cloneUrl = `https://github.com/${owner}/${repo}.git`; + + const defaultDir = await invoke("get_default_clone_dir"); + const destination = `${defaultDir}/${repo}`; + + // Check if destination already exists (from a previous clone attempt) + let clonedPath: string; + try { + const validateResult = await invoke<{ + valid: boolean; + error: string | null; + }>("validate_git_repo", { path: destination }); + if (validateResult.valid) { + console.log(`[pr-import] reusing existing clone at ${destination}`); + clonedPath = destination; + } else { + throw new Error("not a git repo"); + } + } catch { + // Destination doesn't exist or isn't a git repo — clone it + const result = await invoke<{ + success: boolean; + output: string; + error: string | null; + }>("engine_clone_repo", { + url: cloneUrl, + destination, + }); + + if (!result.success) { + throw new Error( + `Failed to clone ${owner}/${repo}: ${result.error ?? "unknown error"}`, + ); + } + clonedPath = destination; + } + + // Add to SUSTN (may already exist from a previous attempt) + try { + await addRepository(clonedPath, repo); + console.log(`[pr-import] cloned and added: ${clonedPath}`); + } catch (e) { + const msg = e instanceof Error ? e.message : String(e); + if (!msg.includes("already been added")) { + throw e; + } + } + + return clonedPath; +} diff --git a/src/core/services/pr-lifecycle.ts b/src/core/services/pr-lifecycle.ts index 3df24ef..741305f 100644 --- a/src/core/services/pr-lifecycle.ts +++ b/src/core/services/pr-lifecycle.ts @@ -18,29 +18,55 @@ import { listComments, markCommentAddressed, setCommentReply, + getPrCommentsRolloutCutoff, } from "@core/db/pr-lifecycle"; import { parseOwnerRepo, listPrReviews, listPrComments, + listIssueComments, getPrStatus, replyToComment, + postPrCommentWithMarker, + bodyHasSustnMarker, requestReview, } from "@core/services/github"; import { listRepositories } from "@core/db/repositories"; import { getGlobalSettings, getProjectOverrides } from "@core/db/settings"; import type { WorkResult } from "@core/types/agent"; -import type { PrState, Task } from "@core/types/task"; -import type { GhPrComment, GhPrReview } from "@core/services/github"; +import type { PrState, PrCommentKind, Task } from "@core/types/task"; +import type { + GhPrComment, + GhPrReview, + GhIssueComment, +} from "@core/services/github"; // ── Claude Response Parsing ───────────────────────────────── interface ClaudeReviewReply { comment_id: number; + /** + * Which source the comment came from. Claude echoes this back from the + * [KIND: ...] tag we put in each comment header so we know which + * GitHub endpoint to post the reply to. + */ + kind?: PrCommentKind; reply: string; made_code_changes: boolean; } +/** + * Sentinel stored in `addressed_in_commit` for rows synthesized from PR + * history that predates the non-inline-comment rollout. These rows are + * never sent to Claude — they exist only to dedup future fetches. + */ +const PRE_ROLLOUT_SENTINEL = "pre-rollout"; + +function isAfterCutoff(ts: string, cutoff: string | undefined): boolean { + if (!cutoff) return true; + return new Date(ts).getTime() > new Date(cutoff).getTime(); +} + // ── Core Lifecycle ────────────────────────────────────────── /** @@ -88,6 +114,15 @@ export async function processTaskPr( completedAt: new Date().toISOString(), }); await recordPrEvent(task.id, "pr_merged", `PR #${prNumber} merged`); + // Clean up worktree + try { + await invoke("engine_cleanup_worktree", { + repoPath: repoPath, + taskId: task.id, + }); + } catch (e) { + console.warn(`[pr-lifecycle] worktree cleanup failed:`, e); + } return; } @@ -138,7 +173,13 @@ export async function processTaskPr( new Date(a.submitted_at).getTime(), )[0]; - // 4. Always sync comments regardless of review state + // 4. Always sync comments regardless of review state. + // We gather three sources: + // - inline review comments (diff-line-anchored) + // - issue-level PR comments (general conversation) + // - review summary bodies (the free text on a review submission) + // All three land in pr_comments tagged by `kind` so downstream + // processing is uniform. console.log(`[pr-lifecycle] fetching comments for PR #${prNumber}`); let ghComments: GhPrComment[]; try { @@ -148,7 +189,24 @@ export async function processTaskPr( ghComments = []; } - // Filter out our own replies + let ghIssueComments: GhIssueComment[]; + try { + ghIssueComments = await listIssueComments( + repoPath, + owner, + repo, + prNumber, + ); + } catch (e) { + console.error(`[pr-lifecycle] failed to fetch issue comments:`, e); + ghIssueComments = []; + } + + const rolloutCutoff = await getPrCommentsRolloutCutoff(); + + // Filter out our own replies (inline comments we've already posted — + // matched by body text because the inline replies API doesn't let us + // inject markers). const existingDbComments = await listComments(task.id); const ourReplyBodies = new Set( existingDbComments @@ -167,13 +225,17 @@ export async function processTaskPr( }); console.log( - `[pr-lifecycle] PR #${prNumber} — ${ghComments.length} total comment(s), ${externalComments.length} external`, + `[pr-lifecycle] PR #${prNumber} — ${ghComments.length} inline (${externalComments.length} external), ${ghIssueComments.length} issue comment(s)`, ); - // Sync external comments to DB + // Sync inline review comments for (const comment of externalComments) { + const preAddressed = !isAfterCutoff(comment.created_at, rolloutCutoff) + ? PRE_ROLLOUT_SENTINEL + : undefined; await upsertComment(task.id, { githubCommentId: comment.id, + kind: "inline", inReplyToId: comment.in_reply_to_id ?? undefined, reviewer: comment.user.login, body: comment.body, @@ -181,13 +243,82 @@ export async function processTaskPr( line: comment.line ?? comment.original_line ?? undefined, side: comment.side ?? undefined, commitId: comment.commit_id ?? undefined, + preAddressedSentinel: preAddressed, + }); + } + + // Sync issue-level PR comments. Skip any that carry our marker — + // those are comments this agent posted previously. Everything else + // is external content we need to address. + for (const issueComment of ghIssueComments) { + if (bodyHasSustnMarker(issueComment.body)) continue; + const preAddressed = !isAfterCutoff( + issueComment.created_at, + rolloutCutoff, + ) + ? PRE_ROLLOUT_SENTINEL + : undefined; + await upsertComment(task.id, { + githubCommentId: issueComment.id, + kind: "issue", + reviewer: issueComment.user.login, + body: issueComment.body, + preAddressedSentinel: preAddressed, + }); + } + + // Synthesize a review_summary row for every review that carries a + // non-empty body. github_comment_id uses the review's id; because + // pr_comments is keyed by (kind, github_comment_id), this won't + // collide with pull-comment ids that happen to share the integer. + for (const review of reviews) { + const body = review.body?.trim(); + if (!body) continue; + const preAddressed = !isAfterCutoff(review.submitted_at, rolloutCutoff) + ? PRE_ROLLOUT_SENTINEL + : undefined; + await upsertComment(task.id, { + githubCommentId: review.id, + kind: "review_summary", + reviewer: review.user.login, + body: `[${review.state}] ${body}`, + preAddressedSentinel: preAddressed, }); } // 5. Determine which comments need processing const refreshedDbComments = await listComments(task.id); + + // Fetch resolved thread IDs from GitHub to skip already-resolved conversations + let resolvedIds = new Set(); + try { + const { getResolvedThreadCommentIds } = + await import("@core/services/github"); + resolvedIds = await getResolvedThreadCommentIds( + repoPath, + owner, + repo, + prNumber, + ); + if (resolvedIds.size > 0) { + console.log( + `[pr-lifecycle] PR #${prNumber} — ${resolvedIds.size} resolved thread(s), skipping`, + ); + } + } catch (e) { + console.warn(`[pr-lifecycle] failed to fetch resolved threads:`, e); + } + const unprocessedComments = refreshedDbComments.filter( - (c) => !c.ourReply && !c.addressedInCommit && !c.inReplyToId, + (c) => + !c.ourReply && + !c.addressedInCommit && + !c.inReplyToId && + // resolvedIds comes from the inline review-threads GraphQL + // query, so it's only meaningful for inline rows. Issue and + // review_summary rows share an integer ID namespace with + // inline rows — don't drop them on a coincidental collision. + (c.kind !== "inline" || !resolvedIds.has(c.githubCommentId)), ); if (task.prState === "opened") { @@ -274,12 +405,34 @@ export async function processTaskPr( ); await updateTask(task.id, { prState: "addressing" as PrState }); + // For imported PRs on the first addressing cycle, fetch the full diff + // so Claude has context about the PR before addressing comments. + let prDiff: string | undefined; + if (task.source === "imported" && !task.sessionId) { + try { + const { getPrDiff } = await import("@core/services/github"); + prDiff = await getPrDiff(repoPath, owner, repo, prNumber); + console.log( + `[pr-lifecycle] fetched PR diff for imported task (${prDiff.length} chars)`, + ); + } catch (e) { + console.warn(`[pr-lifecycle] failed to fetch PR diff:`, e); + } + } + const reviewContext = unprocessedComments .map((c) => { - const location = c.path - ? `File: ${c.path}${c.line ? `:${c.line}` : ""}` - : "General"; - return `[COMMENT_ID: ${c.githubCommentId}] [${location}] @${c.reviewer}:\n${c.body}`; + let location: string; + if (c.kind === "inline") { + location = c.path + ? `File: ${c.path}${c.line ? `:${c.line}` : ""}` + : "Inline comment"; + } else if (c.kind === "issue") { + location = "General PR Comment"; + } else { + location = "Review Summary"; + } + return `[COMMENT_ID: ${c.githubCommentId}] [KIND: ${c.kind}] [${location}] @${c.reviewer}:\n${c.body}`; }) .join("\n\n---\n\n"); @@ -293,6 +446,7 @@ export async function processTaskPr( reviewComments: reviewContext, prDescription: task.description ?? task.title, resumeSessionId: task.sessionId ?? undefined, + prDiff: prDiff ?? null, }); if (result.success) { @@ -336,13 +490,15 @@ export async function processTaskPr( ); } - // Fix null comment_ids — if counts match, zip them + // Fix null comment_ids — if counts match, zip them (and fill + // kind from the corresponding source row so routing still works) for (let i = 0; i < replies.length; i++) { if (!replies[i].comment_id && i < unprocessedComments.length) { replies[i].comment_id = unprocessedComments[i].githubCommentId; + replies[i].kind ??= unprocessedComments[i].kind; console.log( - `[pr-lifecycle] fixed null comment_id → ${replies[i].comment_id} (positional match)`, + `[pr-lifecycle] fixed null comment_id → ${replies[i].comment_id} (positional match, kind=${replies[i].kind})`, ); } } @@ -351,38 +507,79 @@ export async function processTaskPr( `[pr-lifecycle] Claude returned ${replies.length} reply(ies), push=${pushResult.success}`, replies.map((r) => ({ id: r.comment_id, + kind: r.kind, code: r.made_code_changes, reply: r.reply?.slice(0, 60), })), ); - // Post replies to GitHub and update DB + // Post replies to GitHub and update DB, routing by kind so + // each comment gets a reply on the correct endpoint. for (const r of replies) { if (!r.reply || !r.comment_id) continue; - try { - await replyToComment( - repoPath, - owner, - repo, - prNumber, - r.comment_id, - r.reply, + + // Resolve the source row: ID alone is not unique across + // kinds, so prefer Claude's echoed kind and fall back to + // ID-based lookup (with a warning if it's ambiguous). + const matches = unprocessedComments.filter( + (c) => c.githubCommentId === r.comment_id, + ); + const sourceComment = r.kind + ? matches.find((c) => c.kind === r.kind) + : matches[0]; + if (!sourceComment) { + console.warn( + `[pr-lifecycle] reply for unknown comment_id=${r.comment_id} kind=${r.kind ?? "?"}, skipping`, ); - await setCommentReply(r.comment_id, r.reply); + continue; + } + if (matches.length > 1 && !r.kind) { + console.warn( + `[pr-lifecycle] ambiguous reply for comment_id=${r.comment_id} (${matches.length} matches); Claude did not echo kind`, + ); + } + const kind = sourceComment.kind; + + try { + if (kind === "inline") { + await replyToComment( + repoPath, + owner, + repo, + prNumber, + r.comment_id, + r.reply, + ); + } else { + // Issue-level and review-summary replies both go + // to the issues endpoint as new top-level PR + // comments, stamped with a marker so we skip + // them on the next fetch. + await postPrCommentWithMarker( + repoPath, + owner, + repo, + prNumber, + task.id, + r.reply, + ); + } + await setCommentReply(kind, r.comment_id, r.reply); if (r.made_code_changes && result.commitSha) { await markCommentAddressed( + kind, r.comment_id, result.commitSha, ); } console.log( - `[pr-lifecycle] replied to comment ${r.comment_id} (code_changes=${r.made_code_changes})`, + `[pr-lifecycle] replied to ${kind} comment ${r.comment_id} (code_changes=${r.made_code_changes})`, ); } catch (e) { console.error( - `[pr-lifecycle] failed to post reply for comment ${r.comment_id}:`, + `[pr-lifecycle] failed to post reply for ${kind} comment ${r.comment_id}:`, e, ); } @@ -392,10 +589,13 @@ export async function processTaskPr( if (pushResult.success && result.commitSha) { for (const c of unprocessedComments) { const hasReply = replies.some( - (r) => r.comment_id === c.githubCommentId, + (r) => + r.comment_id === c.githubCommentId && + (r.kind ?? c.kind) === c.kind, ); if (!hasReply) { await markCommentAddressed( + c.kind, c.githubCommentId, result.commitSha, ); @@ -510,38 +710,54 @@ export async function prLifecycleTick(): Promise { const repoMap = new Map(repos.map((r) => [r.id, r])); const maxCycles = settings.maxReviewCycles ?? 5; - for (const pr of activePrs) { - const repo = repoMap.get(pr.repositoryId); - if (!repo) { - console.log( - `[pr-lifecycle] skipping PR ${pr.prNumber} — repo not found`, - ); - continue; - } + // Process all active PRs in parallel. Backend concurrency limits + // prevent too many Claude CLI instances from running at once — + // extra PRs will just get "Concurrency limit reached" and wait for + // the next tick. + await Promise.allSettled( + activePrs.map(async (pr) => { + const repo = repoMap.get(pr.repositoryId); + if (!repo) { + console.log( + `[pr-lifecycle] skipping PR ${pr.prNumber} — repo not found`, + ); + return; + } - const task = await getTask(pr.id); - if (!task) { - console.log( - `[pr-lifecycle] skipping PR ${pr.prNumber} — task not found`, - ); - continue; - } + const task = await getTask(pr.id); + if (!task) { + console.log( + `[pr-lifecycle] skipping PR ${pr.prNumber} — task not found`, + ); + return; + } - // Resolve per-repo auto-reply setting (override ?? global) - const overrides = await getProjectOverrides(pr.repositoryId); - const autoReply = - overrides.overridePrAutoReply ?? settings.prLifecycleEnabled; + // Skip PRs currently being addressed (immediate trigger or prior tick) + if (task.prState === "addressing") { + console.log( + `[pr-lifecycle] skipping PR #${pr.prNumber} — already being addressed`, + ); + return; + } - console.log( - `[pr-lifecycle] processing PR #${pr.prNumber} (${task.title}) — prState=${task.prState}, repo=${repo.name}, autoReply=${autoReply}`, - ); + const overrides = await getProjectOverrides(pr.repositoryId); + const autoReply = + overrides.overridePrAutoReply ?? settings.prLifecycleEnabled; - try { - await processTaskPr(task, repo.path, maxCycles, autoReply); - } catch (e) { - console.error(`[pr-lifecycle] error processing ${pr.prUrl}:`, e); - } - } + console.log( + `[pr-lifecycle] processing PR #${pr.prNumber} (${task.title}) — prState=${task.prState}, repo=${repo.name}, autoReply=${autoReply}`, + ); + + try { + await processTaskPr(task, repo.path, maxCycles, autoReply); + } catch (e) { + console.error( + `[pr-lifecycle] error processing ${pr.prUrl}:`, + e, + ); + } + }), + ); console.log("[pr-lifecycle] tick — done"); } diff --git a/src/core/types/agent.ts b/src/core/types/agent.ts index 146f5ad..cf17680 100644 --- a/src/core/types/agent.ts +++ b/src/core/types/agent.ts @@ -11,6 +11,8 @@ export interface AgentConfig { lastScanAt: string | undefined; lastWorkAt: string | undefined; priority: number; + /** Whether the repo participates in task discovery via scans. */ + scanEnabled: boolean; } export interface BudgetConfig { @@ -42,7 +44,27 @@ export interface CurrentTask { export interface EngineStatus { running: boolean; + /** First running task, kept for backward compatibility. */ currentTask: CurrentTask | undefined; + /** All currently running tasks. */ + runningTasks: CurrentTask[]; + /** Maximum number of tasks that can run concurrently. */ + concurrencyLimit: number; +} + +export interface ContentBlock { + kind: "text" | "tool_use" | "tool_result" | "thinking" | "result"; + text: string | undefined; + toolName: string | undefined; + toolTarget: string | undefined; +} + +export interface TaskOutputEvent { + taskId: string; + eventType: string | undefined; + blocks: ContentBlock[]; + raw: string; + timestamp: string; } export interface ScanResult { diff --git a/src/core/types/settings.ts b/src/core/types/settings.ts index 9a3f2a6..cece3ef 100644 --- a/src/core/types/settings.ts +++ b/src/core/types/settings.ts @@ -38,6 +38,9 @@ export interface GlobalSettings { // PR Lifecycle prLifecycleEnabled: boolean; maxReviewCycles: number; + + // Parallel execution + concurrencyLimit: number; } export interface ProjectOverrides { diff --git a/src/core/types/task.ts b/src/core/types/task.ts index 25d5358..ebed3df 100644 --- a/src/core/types/task.ts +++ b/src/core/types/task.ts @@ -17,7 +17,7 @@ export type TaskState = | "dismissed" | "failed"; -export type TaskSource = "manual" | "scan" | "linear"; +export type TaskSource = "manual" | "scan" | "linear" | "imported"; export type EstimatedEffort = "low" | "medium" | "high"; export type PrState = @@ -47,6 +47,7 @@ export interface Task { filesInvolved: string[] | undefined; baseBranch: string | undefined; branchName: string | undefined; + worktreePath: string | undefined; commitSha: string | undefined; sessionId: string | undefined; tokensUsed: number; @@ -75,10 +76,13 @@ export interface PrReview { createdAt: string; } +export type PrCommentKind = "inline" | "issue" | "review_summary"; + export interface PrComment { id: string; taskId: string; githubCommentId: number; + kind: PrCommentKind; inReplyToId: number | undefined; reviewer: string; body: string; diff --git a/src/ui/components/main/EmptyState.tsx b/src/ui/components/main/EmptyState.tsx index ac04f44..db9649c 100644 --- a/src/ui/components/main/EmptyState.tsx +++ b/src/ui/components/main/EmptyState.tsx @@ -21,11 +21,11 @@ export function EmptyState() {

- Scanning for tasks... -

-

Select a project from the sidebar to get started.

+

+ Or paste a PR link in the sidebar to import one. +

); } diff --git a/src/ui/components/onboarding/AddProjectStep.tsx b/src/ui/components/onboarding/AddProjectStep.tsx index 61e557b..283ab14 100644 --- a/src/ui/components/onboarding/AddProjectStep.tsx +++ b/src/ui/components/onboarding/AddProjectStep.tsx @@ -282,6 +282,17 @@ export function AddProjectStep({ onNext }: AddProjectStepProps) { {error} )} + +
+ +
); } diff --git a/src/ui/components/settings/sections/GeneralSection.tsx b/src/ui/components/settings/sections/GeneralSection.tsx index 8c6d7dc..19fe90b 100644 --- a/src/ui/components/settings/sections/GeneralSection.tsx +++ b/src/ui/components/settings/sections/GeneralSection.tsx @@ -132,6 +132,37 @@ export function GeneralSection() { /> + +
+ + + +
{/*
+ {/* Scan for tasks */} +
+
+

+ Scan for tasks +

+

+ Let the agent analyze this repo and suggest + improvements. Turn off for projects where you only + want to handle imported PRs. +

+
+
+ + updateAgentConfig({ + repositoryId, + scanEnabled: checked, + }) + } + /> +
+
+ {/* PR auto-reply */}
diff --git a/src/ui/components/sidebar/Sidebar.tsx b/src/ui/components/sidebar/Sidebar.tsx index 5616a4f..7905375 100644 --- a/src/ui/components/sidebar/Sidebar.tsx +++ b/src/ui/components/sidebar/Sidebar.tsx @@ -1,9 +1,10 @@ -import { useState } from "react"; +import { useState, useRef } from "react"; import type { CSSProperties } from "react"; -import { Plus, Search } from "lucide-react"; +import { Plus, Search, Link } from "lucide-react"; import { useAppStore } from "@core/store/app-store"; import { useScanNow } from "@core/api/useEngine"; import { useGlobalSettings } from "@core/api/useSettings"; +import { useImportPr } from "@core/api/useImportPr"; import { ProjectList } from "./ProjectList"; import { SidebarFooter } from "./SidebarFooter"; import { AddProjectDialog } from "./AddProjectDialog"; @@ -17,8 +18,19 @@ export function Sidebar({ style }: SidebarProps) { const setSelectedRepository = useAppStore((s) => s.setSelectedRepository); const scanNow = useScanNow(); const { data: globalSettings } = useGlobalSettings(); + const importPr = useImportPr(); const [isAddDialogOpen, setIsAddDialogOpen] = useState(false); const [search, setSearch] = useState(""); + const [prUrl, setPrUrl] = useState(""); + const prInputRef = useRef(null); + + function handleImportPr() { + const url = prUrl.trim(); + if (!url) return; + // Clear input immediately — progress shows in a toast + setPrUrl(""); + importPr.mutate(url); + } return (
+ {/* Import PR */} +
+
+ + setPrUrl(e.target.value)} + onKeyDown={(e) => { + if (e.key === "Enter") handleImportPr(); + if (e.key === "Escape") { + setPrUrl(""); + prInputRef.current?.blur(); + } + }} + className="w-full rounded-md border border-sidebar-border bg-transparent py-1.5 pl-7 pr-2 text-[12px] text-sidebar-foreground placeholder:text-sidebar-foreground/35 focus:outline-none focus:ring-1 focus:ring-ring" + /> +
+
+ {/* Project list */}
diff --git a/src/ui/components/tasks/TaskDetailView.tsx b/src/ui/components/tasks/TaskDetailView.tsx index 8f28f0d..045d8b7 100644 --- a/src/ui/components/tasks/TaskDetailView.tsx +++ b/src/ui/components/tasks/TaskDetailView.tsx @@ -12,6 +12,7 @@ import { Loader2, AlertTriangle, LayoutDashboard, + Terminal, } from "lucide-react"; import { useTask, @@ -50,6 +51,8 @@ import { TaskStatusBanner } from "./TaskStatusBanner"; import { queuedToast } from "@ui/lib/toast"; import { FileContentViewer } from "./FileContentViewer"; import { ErrorBoundary } from "@ui/components/ErrorBoundary"; +import { useTaskOutputStream } from "@ui/hooks/useTaskOutputStream"; +import { TaskLiveOutput } from "./TaskLiveOutput"; // ── Constants ─────────────────────────────────────────────── @@ -287,10 +290,27 @@ export function TaskDetailView({ taskId }: TaskDetailViewProps) { const isQueued = useQueueStore((s) => s.isQueued(taskId)); const queuePosition = useQueueStore((s) => s.queuePosition(taskId)); + // ── Live output streaming ── + const { lines: outputLines, isStreaming } = useTaskOutputStream(taskId); + const showAgentTab = + task?.state === "in_progress" || outputLines.length > 0; + // ── Tab state (typed) ── const [openTabs, setOpenTabs] = useState([]); const [activeTab, setActiveTab] = useState("overview"); + // Auto-switch to Agent tab when task starts running + const prevStateRef = useRef(task?.state); + useEffect(() => { + if ( + prevStateRef.current !== "in_progress" && + task?.state === "in_progress" + ) { + setActiveTab("agent"); + } + prevStateRef.current = task?.state; + }, [task?.state]); + // ── Feedback mode (Request Changes flow) ── const [feedbackMode, setFeedbackMode] = useState(false); @@ -391,10 +411,11 @@ export function TaskDetailView({ taskId }: TaskDetailViewProps) { commentId, body, ); - // Update local DB + // Update local DB — this UI path only replies to inline + // review comments, so the kind is always "inline". const { setCommentReply } = await import("@core/db/pr-lifecycle"); - await setCommentReply(commentId, body); + await setCommentReply("inline", commentId, body); // Invalidate to refresh void queryClient.invalidateQueries({ queryKey: ["pr-comments", taskId], @@ -577,8 +598,10 @@ export function TaskDetailView({ taskId }: TaskDetailViewProps) { resumeSessionId, }; - // If another task is running, queue this one instead - if (engineStatus?.currentTask) { + // If at concurrency capacity, queue this one instead + const runningCount = engineStatus?.runningTasks?.length ?? 0; + const limit = engineStatus?.concurrencyLimit ?? 1; + if (runningCount >= limit) { enqueue(params); queuedToast(); return; @@ -869,7 +892,7 @@ export function TaskDetailView({ taskId }: TaskDetailViewProps) { // ── Determine content to show ── - const showTabBar = openTabs.length > 0 || hasChanges; + const showTabBar = openTabs.length > 0 || hasChanges || showAgentTab; return (
@@ -904,6 +927,30 @@ export function TaskDetailView({ taskId }: TaskDetailViewProps) { )} + {showAgentTab && ( + + )} + {openTabs.map((tab) => { const isActive = activeTab === tab.path; return ( @@ -992,6 +1039,11 @@ export function TaskDetailView({ taskId }: TaskDetailViewProps) { relativePath={activeTab} />
+ ) : activeTab === "agent" ? ( + ) : (
{task.state === "in_progress" && ( diff --git a/src/ui/components/tasks/TaskLiveOutput.tsx b/src/ui/components/tasks/TaskLiveOutput.tsx new file mode 100644 index 0000000..a0cd20f --- /dev/null +++ b/src/ui/components/tasks/TaskLiveOutput.tsx @@ -0,0 +1,476 @@ +import { useRef, useEffect, useState, useMemo } from "react"; +import { + ArrowDown, + Wrench, + FileText, + Brain, + CheckCircle2, + AlertTriangle, + Lightbulb, +} from "lucide-react"; +import hljs from "highlight.js"; +import "highlight.js/styles/github-dark.css"; +import type { TaskOutputEvent, ContentBlock } from "@core/types/agent"; + +interface TaskLiveOutputProps { + lines: TaskOutputEvent[]; + isStreaming: boolean; +} + +function formatTime(timestamp: string): string { + try { + const d = new Date(timestamp); + return d.toLocaleTimeString([], { + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }); + } catch { + return ""; + } +} + +// ── Structured JSON detection ────────────────────────────── + +interface ReviewJson { + passed?: boolean; + feedback?: string; + issues?: { description: string; severity: string }[]; +} + +interface ImplementJson { + files_modified?: string[]; + summary?: string; + tests_added?: boolean; +} + +/** + * Try to extract a structured JSON object from a text block. + * Handles plain JSON and fenced code blocks (```json ... ```). + */ +function tryParseStructured( + text: string, +): + | { kind: "review"; data: ReviewJson } + | { kind: "implement"; data: ImplementJson } + | null { + const trimmed = text.trim(); + // Try fenced code block first + const fencedMatch = trimmed.match(/```(?:json)?\s*\n([\s\S]*?)\n```/); + const jsonStr = fencedMatch ? fencedMatch[1] : trimmed; + + // Must look like a JSON object + if (!jsonStr.startsWith("{")) return null; + + try { + const parsed = JSON.parse(jsonStr) as Record; + if ("passed" in parsed || "issues" in parsed) { + return { kind: "review", data: parsed as ReviewJson }; + } + if ("files_modified" in parsed || "tests_added" in parsed) { + return { kind: "implement", data: parsed as ImplementJson }; + } + } catch { + // Not valid JSON + } + return null; +} + +// ── Structured result cards ──────────────────────────────── + +function ReviewResultCard({ data }: { data: ReviewJson }) { + const passed = data.passed === true; + const issues = data.issues ?? []; + const critical = issues.filter((i) => i.severity === "critical"); + const suggestions = issues.filter((i) => i.severity !== "critical"); + + return ( +
+
+ {passed ? ( + <> + + + Review passed + + + ) : ( + <> + + + Review rejected + + + )} +
+ {data.feedback && ( +

+ {data.feedback} +

+ )} + {critical.length > 0 && ( +
+
+ Critical +
+ {critical.map((issue, i) => ( +
+ +

+ {issue.description} +

+
+ ))} +
+ )} + {suggestions.length > 0 && ( +
+
+ Suggestions +
+ {suggestions.map((issue, i) => ( +
+ +

+ {issue.description} +

+
+ ))} +
+ )} +
+ ); +} + +function ImplementResultCard({ data }: { data: ImplementJson }) { + return ( +
+
+ + + Implementation + +
+ {data.summary && ( +

+ {data.summary} +

+ )} + {data.files_modified && data.files_modified.length > 0 && ( +
+
+ Files modified +
+ {data.files_modified.map((f) => ( +
+ + + {f} + +
+ ))} +
+ )} +
+ ); +} + +// ── Syntax-highlighted code ─────────────────────────────── + +/** Detect and render fenced code blocks with syntax highlighting. */ +function CodeWithHighlighting({ text }: { text: string }) { + const parts = useMemo(() => { + // Split on fenced code blocks + const regex = /```(\w+)?\n([\s\S]*?)```/g; + const result: Array< + | { type: "text"; value: string } + | { type: "code"; lang: string; value: string } + > = []; + let lastIndex = 0; + let match; + while ((match = regex.exec(text)) !== null) { + if (match.index > lastIndex) { + result.push({ + type: "text", + value: text.slice(lastIndex, match.index), + }); + } + result.push({ + type: "code", + lang: match[1] || "plaintext", + value: match[2], + }); + lastIndex = match.index + match[0].length; + } + if (lastIndex < text.length) { + result.push({ type: "text", value: text.slice(lastIndex) }); + } + return result; + }, [text]); + + return ( +
+ {parts.map((part, i) => { + if (part.type === "text") { + if (!part.value.trim()) return null; + return ( +

+ {part.value} +

+ ); + } + let highlighted: string; + try { + highlighted = hljs.highlight(part.value, { + language: part.lang, + ignoreIllegals: true, + }).value; + } catch { + highlighted = hljs.highlightAuto(part.value).value; + } + return ( +
+                        
+                    
+ ); + })} +
+ ); +} + +// ── Content block renderer ───────────────────────────────── + +function BlockView({ block }: { block: ContentBlock }) { + if (block.kind === "text") { + if (!block.text) return null; + + // Try to detect structured JSON output (review/implement result) + const structured = tryParseStructured(block.text); + if (structured?.kind === "review") { + return ; + } + if (structured?.kind === "implement") { + return ; + } + + // Check for fenced code blocks + if (block.text.includes("```")) { + return ; + } + + return ( +

+ {block.text} +

+ ); + } + + if (block.kind === "thinking") { + if (!block.text) return null; + return ( +
+ +

+ {block.text} +

+
+ ); + } + + if (block.kind === "tool_use") { + return ( +
+ + + + {block.toolName ?? "tool"} + + {block.toolTarget && ( + + {block.toolTarget} + + )} + +
+ ); + } + + if (block.kind === "tool_result") { + if (!block.text) return null; + // Detect diff output for syntax highlighting + const looksLikeDiff = + block.text.startsWith("diff ") || + block.text.startsWith("---") || + /^@@\s/.test(block.text); + if (looksLikeDiff) { + let highlighted: string; + try { + highlighted = hljs.highlight(block.text, { + language: "diff", + ignoreIllegals: true, + }).value; + } catch { + highlighted = block.text; + } + return ( +
+ +
+                        
+                    
+
+ ); + } + return ( +
+ +

+ {block.text} +

+
+ ); + } + + if (block.kind === "result") { + return ( +
+ + + Completed + +
+ ); + } + + return null; +} + +function EventRow({ event }: { event: TaskOutputEvent }) { + if (event.eventType === "system") { + return ( +
+ + {formatTime(event.timestamp)} + + session started +
+ ); + } + + if (event.blocks.length === 0) return null; + + return ( +
+ + {formatTime(event.timestamp)} + +
+ {event.blocks.map((block, i) => ( + + ))} +
+
+ ); +} + +export function TaskLiveOutput({ lines, isStreaming }: TaskLiveOutputProps) { + const scrollRef = useRef(null); + const [autoScroll, setAutoScroll] = useState(true); + + useEffect(() => { + if (autoScroll && scrollRef.current) { + scrollRef.current.scrollTop = scrollRef.current.scrollHeight; + } + }, [lines.length, autoScroll]); + + function handleScroll() { + if (!scrollRef.current) return; + const { scrollTop, scrollHeight, clientHeight } = scrollRef.current; + const atBottom = scrollHeight - scrollTop - clientHeight < 40; + setAutoScroll(atBottom); + } + + const meaningfulLines = lines.filter( + (e) => e.eventType === "system" || e.blocks.length > 0, + ); + + if (meaningfulLines.length === 0) { + return ( +
+

+ {isStreaming + ? "Waiting for output..." + : "No output yet. Start a task to see live output here."} +

+
+ ); + } + + return ( +
+
+
+ {isStreaming && ( + + + + + )} + + {isStreaming ? "Streaming" : "Completed"} —{" "} + {meaningfulLines.length} event + {meaningfulLines.length !== 1 ? "s" : ""} + +
+
+ +
+ {meaningfulLines.map((line, i) => ( + + ))} +
+ + {!autoScroll && isStreaming && ( + + )} +
+ ); +} diff --git a/src/ui/components/tasks/TaskStatusBanner.tsx b/src/ui/components/tasks/TaskStatusBanner.tsx index d0b11cf..ffa4d51 100644 --- a/src/ui/components/tasks/TaskStatusBanner.tsx +++ b/src/ui/components/tasks/TaskStatusBanner.tsx @@ -35,8 +35,9 @@ export function TaskStatusBanner({ taskId }: TaskStatusBannerProps) { }; }, [taskId]); - const isWorking = status?.currentTask?.taskId === taskId; - const phase = status?.currentTask?.phase; + const runningTask = status?.runningTasks?.find((t) => t.taskId === taskId); + const isWorking = runningTask !== undefined; + const phase = runningTask?.phase; // Clear scan-waiting state once the engine picks up the task useEffect(() => { diff --git a/src/ui/hooks/useTaskOutputStream.ts b/src/ui/hooks/useTaskOutputStream.ts new file mode 100644 index 0000000..b27bbd3 --- /dev/null +++ b/src/ui/hooks/useTaskOutputStream.ts @@ -0,0 +1,68 @@ +import { useState, useEffect } from "react"; +import { listen } from "@tauri-apps/api/event"; +import type { TaskOutputEvent } from "@core/types/agent"; +import { listAgentEvents, saveAgentEvent } from "@core/db/task-agent-events"; + +const MAX_LINES = 500; + +export function useTaskOutputStream(taskId: string | undefined) { + const [lines, setLines] = useState([]); + const [loaded, setLoaded] = useState(false); + + // Load historical events from the DB + useEffect(() => { + if (!taskId) return; + setLines([]); + setLoaded(false); + let cancelled = false; + + void listAgentEvents(taskId).then((historical) => { + if (cancelled) return; + setLines(historical); + setLoaded(true); + }); + + return () => { + cancelled = true; + }; + }, [taskId]); + + // Listen for live events + useEffect(() => { + if (!taskId) return; + + const unlisten = listen( + "agent:task-output", + (event) => { + if (event.payload.taskId !== taskId) return; + + setLines((prev) => { + const next = [...prev, event.payload]; + return next.length > MAX_LINES + ? next.slice(-MAX_LINES) + : next; + }); + + // Persist asynchronously + void saveAgentEvent(event.payload).catch((e) => { + console.warn( + "[useTaskOutputStream] failed to persist event:", + e, + ); + }); + }, + ); + + return () => { + void unlisten.then((fn) => fn()); + }; + }, [taskId]); + + // Streaming if the last event isn't a "result" (completion) event + const isStreaming = + loaded && + lines.length > 0 && + lines[lines.length - 1]?.eventType !== "result"; + + return { lines, isStreaming }; +} diff --git a/src/ui/lib/toast.tsx b/src/ui/lib/toast.tsx index 545af3f..bdb0c8a 100644 --- a/src/ui/lib/toast.tsx +++ b/src/ui/lib/toast.tsx @@ -115,6 +115,48 @@ export function updateAvailableToast(version: string, onInstall: () => void) { ); } +export function prImportProgressToast(toastId: string, step: string) { + toast.custom( + () => ( +
+ + {step} +
+ ), + { id: toastId, duration: Infinity }, + ); +} + +export function prImportSuccessToast(toastId: string, prNumber: number) { + toast.custom( + () => ( +
+
+ +
+ + PR #{prNumber} imported + +
+ ), + { id: toastId, duration: 3000 }, + ); +} + +export function prImportErrorToast(toastId: string, error: string) { + toast.custom( + () => ( +
+ + + {error} + +
+ ), + { id: toastId, duration: 5000 }, + ); +} + function updateInstallingToast() { toast.custom( () => (