-
Notifications
You must be signed in to change notification settings - Fork 35
feat(datafabric): add OTEL span instrumentation for SQL query execution #964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| """UiPath Advanced agent implementation.""" | ||
|
|
||
| from deepagents import CompiledSubAgent, SubAgent | ||
| from deepagents.backends import BackendProtocol, FilesystemBackend | ||
| from deepagents.backends.protocol import BackendFactory | ||
|
|
||
| from .agent import create_advanced_agent, create_advanced_agent_graph | ||
| from .types import AdvancedAgentGraphState | ||
| from .utils import ( | ||
| MEMORY_DIR_NAME, | ||
| MEMORY_INDEX_FILENAME, | ||
| MEMORY_INDEX_VIRTUAL_PATH, | ||
| create_state_with_input, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "MEMORY_DIR_NAME", | ||
| "MEMORY_INDEX_FILENAME", | ||
| "MEMORY_INDEX_VIRTUAL_PATH", | ||
| "AdvancedAgentGraphState", | ||
| "BackendFactory", | ||
| "BackendProtocol", | ||
| "CompiledSubAgent", | ||
| "FilesystemBackend", | ||
| "SubAgent", | ||
| "create_advanced_agent", | ||
| "create_advanced_agent_graph", | ||
| "create_state_with_input", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| """Advanced agent builder.""" | ||
|
|
||
| from collections.abc import Callable, Sequence | ||
| from typing import Any | ||
|
|
||
| from deepagents import CompiledSubAgent, SubAgent | ||
| from deepagents import create_deep_agent as _create_deep_agent | ||
| from deepagents.backends import BackendProtocol | ||
| from deepagents.backends.filesystem import FilesystemBackend | ||
| from deepagents.backends.protocol import BackendFactory | ||
| from langchain.agents.structured_output import ResponseFormat | ||
| from langchain_core.language_models import BaseChatModel | ||
| from langchain_core.messages import HumanMessage | ||
| from langchain_core.tools import BaseTool | ||
| from langgraph.graph import END, START | ||
| from langgraph.graph.state import CompiledStateGraph, StateGraph | ||
| from pydantic import BaseModel | ||
|
|
||
| from uipath_langchain.agent.react.job_attachments import get_job_attachment_paths | ||
|
|
||
| from .types import AdvancedAgentGraphState | ||
| from .utils import ( | ||
| MEMORY_INDEX_VIRTUAL_PATH, | ||
| create_state_with_input, | ||
| resolve_input_attachments, | ||
| ) | ||
|
|
||
|
|
||
| def create_advanced_agent( | ||
| model: BaseChatModel, | ||
| system_prompt: str = "", | ||
| tools: Sequence[BaseTool] = (), | ||
| subagents: Sequence[SubAgent | CompiledSubAgent] = (), | ||
| backend: BackendProtocol | BackendFactory | None = None, | ||
| response_format: ResponseFormat[Any] | None = None, | ||
| memory: Sequence[str] = (), | ||
| ) -> CompiledStateGraph[Any, Any, Any, Any]: | ||
| """Create a deepagents agent with planning, filesystem, and sub-agent tools. | ||
|
|
||
| ``memory`` is a list of file paths loaded via deepagents' ``MemoryMiddleware``: | ||
| each is read from ``backend`` and injected into the system prompt every turn, | ||
| and the model maintains them with ``edit_file``. Empty disables the middleware. | ||
| """ | ||
| return _create_deep_agent( | ||
| model=model, | ||
| system_prompt=system_prompt, | ||
| tools=list(tools), | ||
| subagents=list(subagents), | ||
| backend=backend, | ||
| response_format=response_format, | ||
| memory=list(memory) or None, | ||
| ) | ||
|
|
||
|
|
||
| def create_advanced_agent_graph( | ||
| model: BaseChatModel, | ||
| tools: Sequence[BaseTool], | ||
| system_prompt: str, | ||
| backend: BackendProtocol | BackendFactory | None, | ||
| response_format: ResponseFormat[Any] | None, | ||
| input_schema: type[BaseModel] | None, | ||
| output_schema: type[BaseModel], | ||
| build_user_message: Callable[[dict[str, Any]], str], | ||
| ) -> StateGraph[Any, Any, Any, Any]: | ||
| """Wrap the advanced agent in a parent graph that maps typed I/O to/from messages. | ||
|
|
||
| With a ``FilesystemBackend``, attachment-shaped inputs are downloaded into the | ||
| workspace and given a ``FilePath`` before the user message is built. A | ||
| ``FilesystemBackend`` also enables workspace memory: deepagents' | ||
| ``MemoryMiddleware`` reads ``/memory/MEMORY.md`` from the backend each turn. | ||
| Memory stays disabled for non-filesystem backends, which carry no workspace. | ||
| """ | ||
| memory_sources = ( | ||
| [MEMORY_INDEX_VIRTUAL_PATH] if isinstance(backend, FilesystemBackend) else [] | ||
| ) | ||
|
|
||
| inner_graph = create_advanced_agent( | ||
| model=model, | ||
| tools=tools, | ||
| system_prompt=system_prompt, | ||
| backend=backend, | ||
| response_format=response_format, | ||
| memory=memory_sources, | ||
| ) | ||
|
|
||
| wrapper_state = create_state_with_input(input_schema) | ||
| internal_fields = set(AdvancedAgentGraphState.model_fields.keys()) | ||
| attachment_paths = ( | ||
| get_job_attachment_paths(input_schema) if input_schema is not None else [] | ||
| ) | ||
|
|
||
| async def transform_input_async(state: BaseModel) -> dict[str, Any]: | ||
| state_data = state.model_dump() | ||
| input_data = {k: v for k, v in state_data.items() if k not in internal_fields} | ||
| input_args = ( | ||
| input_schema.model_validate(input_data).model_dump(by_alias=True) | ||
| if input_schema is not None | ||
| else {} | ||
| ) | ||
| if attachment_paths: | ||
| input_args = await resolve_input_attachments( | ||
| backend, attachment_paths, input_args | ||
| ) | ||
| user_text = build_user_message(input_args) | ||
| return {"messages": [HumanMessage(content=user_text, id="user-input")]} | ||
|
|
||
| def transform_output(state: BaseModel) -> dict[str, Any]: | ||
| structured = getattr(state, "structured_response", {}) | ||
| return output_schema.model_validate(structured).model_dump() | ||
|
|
||
| wrapper: StateGraph[Any, Any, Any, Any] = StateGraph( | ||
| wrapper_state, input_schema=input_schema, output_schema=output_schema | ||
| ) | ||
| wrapper.add_node("transform_input", transform_input_async) | ||
| wrapper.add_node("advanced_agent", inner_graph) | ||
| wrapper.add_node("transform_output", transform_output) | ||
| wrapper.add_edge(START, "transform_input") | ||
| wrapper.add_edge("transform_input", "advanced_agent") | ||
| wrapper.add_edge("advanced_agent", "transform_output") | ||
| wrapper.add_edge("transform_output", END) | ||
|
|
||
| return wrapper |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| """State types for the advanced agent wrapper graph.""" | ||
|
|
||
| from typing import Annotated, Any | ||
|
|
||
| from langchain_core.messages import AnyMessage | ||
| from langgraph.graph.message import add_messages | ||
| from pydantic import BaseModel | ||
|
|
||
|
|
||
| class AdvancedAgentGraphState(BaseModel): | ||
| """Graph state for the advanced agent wrapper.""" | ||
|
|
||
| messages: Annotated[list[AnyMessage], add_messages] = [] | ||
| structured_response: dict[str, Any] = {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,110 @@ | ||
| """Advanced agent utilities.""" | ||
|
|
||
| import asyncio | ||
| import copy | ||
| import logging | ||
| import uuid | ||
| from pathlib import Path | ||
| from typing import Any, NamedTuple, cast | ||
|
|
||
| from deepagents.backends import BackendProtocol, FilesystemBackend | ||
| from deepagents.backends.protocol import BackendFactory | ||
| from jsonpath_ng import parse as jsonpath_parse # type: ignore[import-untyped] | ||
| from pydantic import BaseModel | ||
| from uipath.platform import UiPath | ||
| from uipath.platform.attachments import Attachment | ||
|
|
||
| from .types import AdvancedAgentGraphState | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # --- Workspace memory layout --- | ||
| # Durable memory lives under <workspace>/memory/: MEMORY.md is the always-loaded | ||
| # index, entries live in <workspace>/memory/<name>.md. deepagents' MemoryMiddleware | ||
| # handles loading/injection, but backed by the agent's FilesystemBackend (run-scoped, | ||
| # persisted via WorkspaceHydrator) rather than the cross-run StoreBackend. | ||
| MEMORY_DIR_NAME = "memory" | ||
| MEMORY_INDEX_FILENAME = "MEMORY.md" | ||
|
|
||
| # Virtual path handed to MemoryMiddleware as a source; the agent's virtual-mode | ||
| # FilesystemBackend resolves it under the workspace root. | ||
| MEMORY_INDEX_VIRTUAL_PATH = f"/{MEMORY_DIR_NAME}/{MEMORY_INDEX_FILENAME}" | ||
|
|
||
|
|
||
| def create_state_with_input( | ||
| input_schema: type[BaseModel] | None, | ||
| ) -> type[AdvancedAgentGraphState]: | ||
| """Create combined state by merging AdvancedAgentGraphState with the input schema.""" | ||
| if input_schema is None: | ||
| return AdvancedAgentGraphState | ||
| CompleteState = type( | ||
| "CompleteAdvancedAgentGraphState", | ||
| (AdvancedAgentGraphState, input_schema), | ||
| {}, | ||
| ) | ||
| cast(type[BaseModel], CompleteState).model_rebuild() | ||
| return CompleteState | ||
|
|
||
|
|
||
| class _AttachmentDownload(NamedTuple): | ||
| """One input attachment to download and patch back into the args.""" | ||
|
|
||
| location: Any | ||
| attachment_id: uuid.UUID | ||
| file_name: str | ||
| ticket: dict[str, Any] | ||
|
|
||
|
|
||
| async def resolve_input_attachments( | ||
| backend: BackendProtocol | BackendFactory | None, | ||
| attachment_paths: list[str], | ||
| input_args: dict[str, Any], | ||
| ) -> dict[str, Any]: | ||
| """Download attachment-shaped inputs into the backend and add a ``FilePath``. | ||
|
|
||
| Each ticket is streamed to ``<backend.cwd>/<ID>_<name>`` and augmented with a | ||
| ``FilePath`` so the agent's file tools can open it. FilesystemBackend only. | ||
| """ | ||
| if not isinstance(backend, FilesystemBackend): | ||
| raise NotImplementedError( | ||
| "Advanced agent with input attachments requires a FilesystemBackend, " | ||
| f"got {type(backend).__name__}" | ||
| ) | ||
|
|
||
| result = copy.deepcopy(input_args) | ||
| client = UiPath() | ||
|
|
||
| worklist: list[_AttachmentDownload] = [] | ||
| for path_expr in attachment_paths: | ||
| for match in jsonpath_parse(path_expr).find(result): | ||
| ticket = match.value | ||
| if not isinstance(ticket, dict) or "ID" not in ticket: | ||
| continue | ||
| att = Attachment.model_validate(ticket, from_attributes=True) | ||
| worklist.append( | ||
| _AttachmentDownload( | ||
| location=match.full_path, | ||
| attachment_id=att.id, | ||
| # basename only: full_name is caller-controlled, keep the | ||
| # download inside the workspace (no path traversal) | ||
| file_name=f"{att.id}_{Path(att.full_name).name}", | ||
| ticket=ticket, | ||
| ) | ||
| ) | ||
|
|
||
| logger.info( | ||
| "Downloading %d input attachment(s) into %s", len(worklist), backend.cwd | ||
| ) | ||
|
|
||
| await asyncio.gather( | ||
| *( | ||
| client.attachments.download_async( | ||
| key=item.attachment_id, | ||
| destination_path=str(backend.cwd / item.file_name), | ||
| ) | ||
| for item in worklist | ||
| ) | ||
| ) | ||
| for item in worklist: | ||
| item.location.update(result, {**item.ticket, "FilePath": f"/{item.file_name}"}) | ||
| return result |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.