From 9605b10976436685b0a305ddb851821dec137b3c Mon Sep 17 00:00:00 2001 From: Sahdev Garg Date: Wed, 7 May 2025 11:59:28 +0530 Subject: [PATCH] handling of telemetry labels from UI --- go/genkit/reflection.go | 22 +++++++++++++++++----- go/genkit/reflection_test.go | 6 ++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/go/genkit/reflection.go b/go/genkit/reflection.go index 67337ee77d..37852ee862 100644 --- a/go/genkit/reflection.go +++ b/go/genkit/reflection.go @@ -265,9 +265,10 @@ func handleRunAction(reg *registry.Registry) func(w http.ResponseWriter, r *http ctx := r.Context() var body struct { - Key string `json:"key"` - Input json.RawMessage `json:"input"` - Context json.RawMessage `json:"context"` + Key string `json:"key"` + Input json.RawMessage `json:"input"` + Context json.RawMessage `json:"context"` + TelemetryLabels json.RawMessage `json:"telemetryLabels"` } defer r.Body.Close() if err := json.NewDecoder(r.Body).Decode(&body); err != nil { @@ -303,7 +304,7 @@ func handleRunAction(reg *registry.Registry) func(w http.ResponseWriter, r *http json.Unmarshal(body.Context, &contextMap) } - resp, err := runAction(ctx, reg, body.Key, body.Input, cb, contextMap) + resp, err := runAction(ctx, reg, body.Key, body.Input, body.TelemetryLabels, cb, contextMap) if err != nil { if stream { reflectErr, err := json.Marshal(core.ToReflectionError(err)) @@ -379,7 +380,7 @@ type telemetry struct { TraceID string `json:"traceId"` } -func runAction(ctx context.Context, reg *registry.Registry, key string, input json.RawMessage, cb streamingCallback[json.RawMessage], runtimeContext map[string]any) (*runActionResponse, error) { +func runAction(ctx context.Context, reg *registry.Registry, key string, input json.RawMessage, telemetryLabels json.RawMessage, cb streamingCallback[json.RawMessage], runtimeContext map[string]any) (*runActionResponse, error) { action := reg.LookupAction(key) if action == nil { return nil, core.NewError(core.NOT_FOUND, "action %q not found", key) @@ -391,6 +392,17 @@ func runAction(ctx context.Context, reg *registry.Registry, key string, input js var traceID string output, err := tracing.RunInNewSpan(ctx, reg.TracingState(), "dev-run-action-wrapper", "", true, input, func(ctx context.Context, input json.RawMessage) (json.RawMessage, error) { tracing.SetCustomMetadataAttr(ctx, "genkit-dev-internal", "true") + // Set telemetry labels from payload to span + if telemetryLabels != nil { + var telemetryAttributes map[string]string + err := json.Unmarshal(telemetryLabels, &telemetryAttributes) + if err != nil { + return nil, core.NewError(core.INTERNAL, "Error unmarshalling telemetryLabels: %v", err) + } + for k, v := range telemetryAttributes { + tracing.SetCustomMetadataAttr(ctx, k, v) + } + } traceID = trace.SpanContextFromContext(ctx).TraceID().String() return action.RunJSON(ctx, input, cb) }) diff --git a/go/genkit/reflection_test.go b/go/genkit/reflection_test.go index 58a68634ac..d8469f29d7 100644 --- a/go/genkit/reflection_test.go +++ b/go/genkit/reflection_test.go @@ -151,6 +151,12 @@ func TestServeMux(t *testing.T) { wantStatus: http.StatusOK, wantResult: "2", }, + { + name: "check telemetry labels", + body: `{"key": "/custom/test/dec", "input": 3,"telemetryLabels":{"test_k":"test_v"}}`, + wantStatus: http.StatusOK, + wantResult: "2", + }, { name: "invalid action key", body: `{"key": "/custom/test/invalid", "input": 3}`,