Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 62 additions & 78 deletions src/uipath_langchain/agent/tools/escalation_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@ async def aretrieve(
otel_trace = None # type: ignore[assignment]

# Span attribute keys matching what the LlmOpsHttpExporter and
# Studio UI expect. "openinference.span.kind" sets SpanType.
# Studio UI expect. "openinference.span.kind" sets SpanType;
# "agentMemoryLookup" renders as "Memory lookup" in the trace view.
# Unlike episodic memory, escalation memory emits no
# "applyDynamicFewShot" child span: that span type's renderer only
# displays episodic feedback items (response.results[].feedback),
# which escalation results (results[].answer) never contain, so the
# child rendered as an empty "Dynamic few-shot" span.
lookup_span_ctx = (
tracer.start_as_current_span(
"Find previous memories",
Expand All @@ -140,81 +146,51 @@ async def aretrieve(
)

with lookup_span_ctx as lookup_span:
fewshot_span_ctx = (
tracer.start_as_current_span(
"Apply escalation memory",
attributes={
# LlmOps/Studio still key memory rendering off this
# exported span type; rename it when that contract changes.
"openinference.span.kind": "applyDynamicFewShot",
"type": "applyDynamicFewShot",
"span_type": "applyDynamicFewShot",
"uipath.custom_instrumentation": True,
"memorySpaceName": self.memory_space_name,
"memorySpaceId": self.memory_space_id,
"strategy": ESCALATION_MEMORY_STRATEGY,
},
)
if tracer
else _noop_context()
)

with fewshot_span_ctx as fewshot_span:
try:
try:
try:
response = await sdk.memory.escalation_search_async(
memory_space_id=self.memory_space_id,
request=request,
folder_path=self.folder_path,
)
except ValidationError:
# Some existing escalation memories store `answer` as a
# JSON string that the SDK response model rejects. The
# raw API payload is still usable and parsed below.
response = await self._raw_escalation_search(sdk, request)

results = _read_value(response, "results") or []
results_count = _safe_len(results)
cached_result = _cached_result_from_search_response(response)
# Set request/response on fewshot span as JSON strings.
# The exporter parses JSON strings back to objects.
# The UI reads "response" to display matched memory items.
if fewshot_span and hasattr(fewshot_span, "set_attribute"):
fewshot_span.set_attribute(
"request",
_json_dumps(
request.model_dump(by_alias=True, exclude_none=True)
),
)
fewshot_span.set_attribute(
"response",
_serialize_search_response_for_trace(response),
)
fewshot_span.set_attribute(
"fromMemory", cached_result is not None
)
except Exception as error:
error_detail = repr(error)
if otel_trace:
if fewshot_span and hasattr(fewshot_span, "set_status"):
fewshot_span.set_status(
otel_trace.StatusCode.ERROR, error_detail
)
if lookup_span and hasattr(lookup_span, "set_status"):
lookup_span.set_status(
otel_trace.StatusCode.ERROR, error_detail
)
raise

if lookup_span and hasattr(lookup_span, "set_attribute"):
lookup_span.set_attribute("memoryItemsMatched", results_count)
if cached_result is not None:
response = await sdk.memory.escalation_search_async(
memory_space_id=self.memory_space_id,
request=request,
folder_path=self.folder_path,
)
except ValidationError:
# Some existing escalation memories store `answer` as a
# JSON string that the SDK response model rejects. The
# raw API payload is still usable and parsed below.
response = await self._raw_escalation_search(sdk, request)

results = _read_value(response, "results") or []
results_count = _safe_len(results)
cached_result = _cached_result_from_search_response(response)
# Set request/response as JSON strings; the exporter parses
# them back to objects for the Studio UI.
if lookup_span and hasattr(lookup_span, "set_attribute"):
lookup_span.set_attribute(
"result",
"request",
_json_dumps(
cached_result.model_dump(by_alias=True, exclude_none=True)
request.model_dump(by_alias=True, exclude_none=True)
),
)
lookup_span.set_attribute(
"response",
_serialize_search_response_for_trace(response),
)
lookup_span.set_attribute("fromMemory", cached_result is not None)
lookup_span.set_attribute("memoryItemsMatched", results_count)
if cached_result is not None:
lookup_span.set_attribute(
"result",
_json_dumps(
cached_result.model_dump(
by_alias=True, exclude_none=True
)
),
)
except Exception as error:
error_detail = repr(error)
if otel_trace and lookup_span and hasattr(lookup_span, "set_status"):
lookup_span.set_status(otel_trace.StatusCode.ERROR, error_detail)
raise

return cached_result

Expand Down Expand Up @@ -594,14 +570,16 @@ async def _ingest_escalation_memory(
otel_trace = None # type: ignore[assignment]

# Span attribute keys match what the LlmOpsHttpExporter and Studio UI expect;
# "openinference.span.kind" sets the SpanType.
# "openinference.span.kind" sets the SpanType. "agentMemoryStore" renders
# as "Memory store" in the trace view; its Results tab displays the
# "inputs"/"outputs" attributes set below.
ingest_span_ctx = (
tracer.start_as_current_span(
"Save escalation memory",
attributes={
"openinference.span.kind": "agentMemoryWrite",
"type": "agentMemoryWrite",
"span_type": "agentMemoryWrite",
"openinference.span.kind": "agentMemoryStore",
"type": "agentMemoryStore",
"span_type": "agentMemoryStore",
"uipath.custom_instrumentation": True,
"memorySpaceName": memory_space_name or "",
"memorySpaceId": memory_space_id,
Expand All @@ -624,11 +602,11 @@ async def _ingest_escalation_memory(
user_id=normalized_user_id,
)
# Record what was saved as a JSON string; the exporter parses it
# back to an object and the Studio UI reads "request" to display
# the persisted memory item. Mirrors the lookup span's "request".
# back to an object and the Studio UI shows "inputs"/"outputs"
# in the Results tab of "agentMemoryStore" spans.
if ingest_span and hasattr(ingest_span, "set_attribute"):
ingest_span.set_attribute(
"request",
"inputs",
_json_dumps(request.model_dump(by_alias=True, exclude_none=True)),
)
sdk = UiPath()
Expand All @@ -639,12 +617,18 @@ async def _ingest_escalation_memory(
)
if ingest_span and hasattr(ingest_span, "set_attribute"):
ingest_span.set_attribute("savedToMemory", True)
ingest_span.set_attribute(
"outputs", _json_dumps({"savedToMemory": True})
)
logger.info(
"Ingested escalation outcome into memory space '%s'", memory_space_id
)
except Exception as error:
if ingest_span and hasattr(ingest_span, "set_attribute"):
ingest_span.set_attribute("savedToMemory", False)
ingest_span.set_attribute(
"outputs", _json_dumps({"savedToMemory": False})
)
if otel_trace and ingest_span and hasattr(ingest_span, "set_status"):
ingest_span.set_status(otel_trace.StatusCode.ERROR, repr(error))
if ingest_span and hasattr(ingest_span, "record_exception"):
Expand Down
95 changes: 45 additions & 50 deletions tests/agent/tools/test_escalation_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,50 +692,40 @@ def get_tracer(name: str) -> _FakeTracer:
)

assert result is not None
# A single "Memory lookup" span; no "applyDynamicFewShot" child —
# the Studio UI renders that span type with the episodic feedback
# renderer, which cannot display escalation results.
assert [span.name for span in fake_tracer.spans] == [
"Find previous memories",
"Apply escalation memory",
]
assert tracer_names == ["uipath_langchain.memory"]
lookup_span, apply_memory_span = fake_tracer.spans
assert lookup_span.attributes == {
"openinference.span.kind": "agentMemoryLookup",
"type": "agentMemoryLookup",
"span_type": "agentMemoryLookup",
"uipath.custom_instrumentation": True,
"memorySpaceName": "MemorySpace",
"memorySpaceId": "space-123",
"strategy": ESCALATION_MEMORY_STRATEGY,
"memoryItemsMatched": 1,
"result": json.dumps(
{
"output": {
"action": "approve",
"reason": "meets criteria",
},
"outcome": "approved",
}
),
}
assert (
apply_memory_span.attributes["openinference.span.kind"]
== "applyDynamicFewShot"
)
assert apply_memory_span.attributes["type"] == "applyDynamicFewShot"
assert apply_memory_span.attributes["span_type"] == "applyDynamicFewShot"
assert apply_memory_span.attributes["uipath.custom_instrumentation"] is True
assert apply_memory_span.attributes["memorySpaceName"] == "MemorySpace"
assert apply_memory_span.attributes["memorySpaceId"] == "space-123"
assert apply_memory_span.attributes["strategy"] == ESCALATION_MEMORY_STRATEGY
assert apply_memory_span.attributes["fromMemory"] is True
request_payload = json.loads(str(apply_memory_span.attributes["request"]))
(lookup_span,) = fake_tracer.spans
assert lookup_span.attributes["openinference.span.kind"] == "agentMemoryLookup"
assert lookup_span.attributes["type"] == "agentMemoryLookup"
assert lookup_span.attributes["span_type"] == "agentMemoryLookup"
assert lookup_span.attributes["uipath.custom_instrumentation"] is True
assert lookup_span.attributes["memorySpaceName"] == "MemorySpace"
assert lookup_span.attributes["memorySpaceId"] == "space-123"
assert lookup_span.attributes["strategy"] == ESCALATION_MEMORY_STRATEGY
assert lookup_span.attributes["fromMemory"] is True
assert lookup_span.attributes["memoryItemsMatched"] == 1
assert lookup_span.attributes["result"] == json.dumps(
{
"output": {
"action": "approve",
"reason": "meets criteria",
},
"outcome": "approved",
}
)
request_payload = json.loads(str(lookup_span.attributes["request"]))
assert request_payload["fields"][0]["keyPath"] == [
"escalation-input",
"Content",
]
assert request_payload["fields"][0]["value"] == "Is the sky blue?"
assert request_payload["definitionSystemPrompt"] == ""
response_payload = json.loads(str(apply_memory_span.attributes["response"]))
response_payload = json.loads(str(lookup_span.attributes["response"]))
assert response_payload["results"][0]["answer"]["outcome"] == "approved"
mock_record_metric.assert_called_once_with(MEMORY_CACHE_HIT_METRIC, "space-123")

Expand Down Expand Up @@ -778,16 +768,15 @@ def get_tracer(name: str) -> _FakeTracer:

assert result is None
assert tracer_names == ["uipath_langchain.memory"]
lookup_span, apply_memory_span = fake_tracer.spans
(lookup_span,) = fake_tracer.spans
assert lookup_span.attributes["memoryItemsMatched"] == 0
assert lookup_span.attributes["memorySpaceName"] == "MemorySpace"
assert lookup_span.attributes["strategy"] == ESCALATION_MEMORY_STRATEGY
assert lookup_span.attributes["fromMemory"] is False
assert "result" not in lookup_span.attributes
assert apply_memory_span.attributes["memorySpaceName"] == "MemorySpace"
assert apply_memory_span.attributes["strategy"] == ESCALATION_MEMORY_STRATEGY
assert apply_memory_span.attributes["fromMemory"] is False
request_payload = json.loads(str(apply_memory_span.attributes["request"]))
request_payload = json.loads(str(lookup_span.attributes["request"]))
assert request_payload["fields"][0]["value"] == "Is the sky blue?"
response_payload = json.loads(str(apply_memory_span.attributes["response"]))
response_payload = json.loads(str(lookup_span.attributes["response"]))
assert response_payload == {"results": []}
mock_record_metric.assert_called_once_with(
MEMORY_CACHE_MISS_METRIC, "space-123"
Expand All @@ -813,10 +802,9 @@ async def test_sets_memory_lookup_spans_to_error_on_search_failure(
uipath_sdk=mock_sdk,
).aretrieve({"Content": "Is the sky blue?"})

lookup_span, apply_memory_span = fake_tracer.spans
(lookup_span,) = fake_tracer.spans
expected_status = (trace.StatusCode.ERROR, "RuntimeError('search down')")
assert lookup_span.statuses == [expected_status]
assert apply_memory_span.statuses == [expected_status]

@pytest.mark.asyncio
@patch("uipath_langchain.agent.tools.escalation_memory._record_custom_metric")
Expand Down Expand Up @@ -1113,22 +1101,26 @@ def get_tracer(name: str) -> _FakeTracer:
assert tracer_names == ["uipath_langchain.memory"]
assert [span.name for span in fake_tracer.spans] == ["Save escalation memory"]
(write_span,) = fake_tracer.spans
assert write_span.attributes["openinference.span.kind"] == "agentMemoryWrite"
assert write_span.attributes["type"] == "agentMemoryWrite"
assert write_span.attributes["span_type"] == "agentMemoryWrite"
assert write_span.attributes["openinference.span.kind"] == "agentMemoryStore"
assert write_span.attributes["type"] == "agentMemoryStore"
assert write_span.attributes["span_type"] == "agentMemoryStore"
assert write_span.attributes["uipath.custom_instrumentation"] is True
assert write_span.attributes["memorySpaceName"] == "MemorySpace"
assert write_span.attributes["memorySpaceId"] == "space-123"
assert write_span.attributes["strategy"] == ESCALATION_MEMORY_STRATEGY
assert write_span.attributes["fromMemory"] is False
assert write_span.attributes["savedToMemory"] is True
# "request" captures what was saved as a JSON string the exporter
# parses back into an object for the Studio UI.
saved_request = write_span.attributes["request"]
assert isinstance(saved_request, str)
saved = json.loads(saved_request)
# "inputs" captures what was saved as a JSON string the exporter
# parses back into an object; the Studio UI shows inputs/outputs
# in the Results tab of "agentMemoryStore" spans.
saved_inputs = write_span.attributes["inputs"]
assert isinstance(saved_inputs, str)
saved = json.loads(saved_inputs)
assert saved["answer"] == '{"approved": true}'
assert saved["attributes"] == '{"input": "test"}'
assert json.loads(str(write_span.attributes["outputs"])) == {
"savedToMemory": True
}

@pytest.mark.asyncio
async def test_sets_memory_write_span_to_error_on_ingest_failure(
Expand Down Expand Up @@ -1158,6 +1150,9 @@ async def test_sets_memory_write_span_to_error_on_ingest_failure(

(write_span,) = fake_tracer.spans
assert write_span.attributes["savedToMemory"] is False
assert json.loads(str(write_span.attributes["outputs"])) == {
"savedToMemory": False
}
assert write_span.recorded_errors == [error]
assert write_span.statuses

Expand Down
Loading