Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 164 additions & 2 deletions src/api/routes/enterprise.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from __future__ import annotations

import asyncio
import logging
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -835,6 +836,108 @@ async def delete_annotation(
)


# ---------------------------------------------------------------------------
# Helper: Extract and create annotations from chat using Code Agent
# ---------------------------------------------------------------------------

async def _extract_and_create_annotations(
query: str,
project_id: str,
user: dict,
ann_store: TeamAnnotationStore,
project_store: ProjectStore,
) -> List[str]:
"""Extract annotations from chat message using Code Agent and store them.

Args:
query: The user's chat message
project_id: The project ID
user: The user dict containing id, name, etc.
ann_store: The annotation store instance
project_store: The project store instance

Returns:
List of created annotation IDs
"""
created_ids = []

try:
# Import Code Agent
from src.agents.code import CodeAgent
from src.llm.factory import get_llm

# Initialize the Code Agent with the LLM
llm = get_llm()
agent = CodeAgent(model=llm)

# Run the agent to extract annotations
state = {"classifier_output": query}
result = await agent.arun(state)

if result.is_empty:
logger.info("No annotations extracted from chat message")
return created_ids

# Get user info
user_id = user.get("id") or user.get("google_id")
username = user.get("username") or user.get("name") or user_id

# Get user's role in the project
user_role_obj = project_store.get_user_role_in_project(project_id, user_id)
user_role = user_role_obj.value if user_role_obj else "member"

# Get project info for repo
project = project_store.get_project(project_id)
repo = project.get("repo", "unknown") if project else "unknown"

# Create annotations from extracted results
for ann in result.annotations:
try:
# Map annotation type to a valid type for storage
ann_type = ann.annotation_type.value if ann.annotation_type else "explanation"

# Determine severity
severity = None
if ann.severity:
severity = ann.severity.value

# Create the annotation
annotation_id = ann_store.create_annotation(
project_id=project_id,
content=ann.content,
author_id=user_id,
author_name=username,
author_role=user_role,
annotation_type=ann_type,
severity=severity,
file_path=ann.target_file,
symbol_name=ann.target_symbol,
repo=ann.repo or repo,
)

if annotation_id:
created_ids.append(annotation_id)
logger.info(
"Created annotation %s from chat for %s",
annotation_id,
ann.target_symbol or ann.target_file or "general",
)

except Exception as e:
logger.warning("Failed to create annotation from chat: %s", e)
continue

# Update project annotation count
if created_ids:
project_store.increment_annotation_count(project_id, len(created_ids))
logger.info("Created %d annotations from chat message", len(created_ids))

except Exception as e:
logger.warning("Annotation extraction from chat failed: %s", e)

return created_ids


# ---------------------------------------------------------------------------
# Team chat route
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -864,6 +967,13 @@ async def team_chat(
if project is None:
raise HTTPException(status_code=404, detail="Project not found")

# Get user's role in project
user_role_obj = project_store.get_user_role_in_project(project_id, user_id)
user_role = user_role_obj.value if user_role_obj else "member"

# Check if user can create annotations (all except INTERN)
can_create_annotations = project_store.check_user_can_annotate(project_id, user_id)

# Get relevant annotations
relevant_annotations = await ann_store.search_relevant_for_query(
project_id=project_id,
Expand All @@ -873,6 +983,15 @@ async def team_chat(
top_k=5,
)

# For interns and SDE2, also get manager instructions
manager_instructions = []
if user_role in ("intern", "sde2"):
manager_instructions = await ann_store.get_manager_instructions(
project_id=project_id,
target_role=user_role,
top_k=10,
)

# Get the code pipeline with project_id for annotation retrieval
pipeline = get_code_pipeline(
org_id=project["org_id"],
Expand All @@ -882,19 +1001,53 @@ async def team_chat(

# Modify the query to include annotation context if relevant annotations found
enhanced_query = req.query
context_parts = []

# Add relevant team annotations
if relevant_annotations:
ann_context = "\n\nTeam Knowledge:\n"
ann_context = "Team Knowledge:\n"
for ann in relevant_annotations:
ann_context += f"- [{ann.get('annotation_type')}] {ann.get('content')[:200]}... "
ann_context += f"(by {ann.get('author_name')}, {ann.get('author_role')})\n"
enhanced_query = req.query + ann_context
context_parts.append(ann_context)

# Add manager instructions for interns and SDE2
if manager_instructions:
instruction_context = "Manager Instructions:\n"
for instruction in manager_instructions:
instruction_context += f"- [{instruction.get('annotation_type')}] {instruction.get('content')[:250]}... "
instruction_context += f"(from {instruction.get('author_name')})\n"
context_parts.append(instruction_context)

# Combine all context with the query
if context_parts:
enhanced_query = req.query + "\n\n" + "\n".join(context_parts)

# If user can create annotations, extract them from the chat message
# This runs in the background while streaming the response
annotation_task = None
if can_create_annotations:
# Start annotation extraction as a background task
annotation_task = asyncio.create_task(
_extract_and_create_annotations(
query=req.query,
project_id=project_id,
user=user,
ann_store=ann_store,
project_store=project_store,
)
)

# Stream the response
async def generate():
# First yield the relevant annotations as context
if relevant_annotations:
yield f'{{"type": "annotations", "annotations": {jsonable_encoder(relevant_annotations)}}}\n'

# Yield manager instructions for interns/SDE2
if manager_instructions:
yield f'{{"type": "manager_instructions", "instructions": {jsonable_encoder(manager_instructions)}}}\n'

# Then stream the chat response
async for chunk in pipeline.run_stream(
query=enhanced_query,
Expand All @@ -904,6 +1057,15 @@ async def generate():
):
yield chunk

# After streaming, yield annotation creation result if applicable
if annotation_task:
try:
created_ids = await annotation_task
if created_ids:
yield f'{{"type": "annotations_created", "count": {len(created_ids)}, "ids": {jsonable_encoder(created_ids)}}}\n'
except Exception as e:
logger.warning("Failed to get annotation extraction result: %s", e)

return StreamingResponse(
generate(),
media_type="application/x-ndjson",
Expand Down
7 changes: 5 additions & 2 deletions src/database/project_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,12 @@ def get_user_role_in_project(self, project_id: str, user_id: str) -> Optional[Te
return None

def check_user_can_annotate(self, project_id: str, user_id: str) -> bool:
"""Check if user can create annotations (all roles can)."""
"""Check if user can create annotations (all roles except INTERN)."""
role = self.get_user_role_in_project(project_id, user_id)
return role is not None
if role is None:
return False
# All roles except INTERN can create annotations
return role != TeamRole.INTERN

def check_user_can_manage_team(self, project_id: str, user_id: str) -> bool:
"""Check if user can manage team (manager only)."""
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/code_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ async def _search_annotations(
from src.storage.team_annotation_store import TeamAnnotationStore

store = TeamAnnotationStore()
results = store.search_annotations(
results = await store.search_annotations(
project_id=project_id,
query=query,
top_k=top_k,
Expand Down
57 changes: 57 additions & 0 deletions src/storage/team_annotation_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,63 @@ async def search_relevant_for_query(

return annotations

async def get_manager_instructions(
self,
project_id: str,
target_role: Optional[str] = None,
top_k: int = 20,
) -> List[Dict[str, Any]]:
"""Get manager instructions for team members.

This retrieves annotations created by managers that are marked as
instructions or general guidance for the team.

Args:
project_id: The project ID
target_role: Optional target role to filter for (e.g., 'intern', 'sde2')
top_k: Maximum number of instructions to return

Returns:
List of manager instruction annotation dicts
"""
store = self._get_store(project_id)

# Build filters for manager annotations
filters = {
"status": "active",
"author_role": "manager",
}

# Search for all manager annotations
# We use a broad query to get instructions
results = await store.search_by_text(
query_text="instruction guidance manager note important",
top_k=top_k,
filters=filters,
)

instructions = []
for r in results:
meta = r.metadata or {}
# Include annotations that are instructions or general explanations
ann_type = meta.get("annotation_type", "explanation")
if ann_type in ("instruction", "explanation", "warning", "feature_idea"):
instruction = {
"id": r.id,
"content": r.content,
"author_name": meta.get("author_name", "Manager"),
"author_role": meta.get("author_role", "manager"),
"annotation_type": ann_type,
"created_at": meta.get("created_at"),
"file_path": meta.get("file_path"),
"symbol_name": meta.get("symbol_name"),
**meta,
}
instructions.append(instruction)

logger.info("Retrieved %d manager instructions for project %s", len(instructions), project_id)
return instructions

def clear_project_annotations(
self,
project_id: str,
Expand Down