@@ -265,9 +265,10 @@ func handleRunAction(reg *registry.Registry) func(w http.ResponseWriter, r *http
265
265
ctx := r .Context ()
266
266
267
267
var body struct {
268
- Key string `json:"key"`
269
- Input json.RawMessage `json:"input"`
270
- Context json.RawMessage `json:"context"`
268
+ Key string `json:"key"`
269
+ Input json.RawMessage `json:"input"`
270
+ Context json.RawMessage `json:"context"`
271
+ TelemetryLabels json.RawMessage `json:"telemetryLabels"`
271
272
}
272
273
defer r .Body .Close ()
273
274
if err := json .NewDecoder (r .Body ).Decode (& body ); err != nil {
@@ -303,7 +304,7 @@ func handleRunAction(reg *registry.Registry) func(w http.ResponseWriter, r *http
303
304
json .Unmarshal (body .Context , & contextMap )
304
305
}
305
306
306
- resp , err := runAction (ctx , reg , body .Key , body .Input , cb , contextMap )
307
+ resp , err := runAction (ctx , reg , body .Key , body .Input , body . TelemetryLabels , cb , contextMap )
307
308
if err != nil {
308
309
if stream {
309
310
reflectErr , err := json .Marshal (core .ToReflectionError (err ))
@@ -379,7 +380,7 @@ type telemetry struct {
379
380
TraceID string `json:"traceId"`
380
381
}
381
382
382
- func runAction (ctx context.Context , reg * registry.Registry , key string , input json.RawMessage , cb streamingCallback [json.RawMessage ], runtimeContext map [string ]any ) (* runActionResponse , error ) {
383
+ func runAction (ctx context.Context , reg * registry.Registry , key string , input json.RawMessage , telemetryLabels json. RawMessage , cb streamingCallback [json.RawMessage ], runtimeContext map [string ]any ) (* runActionResponse , error ) {
383
384
action := reg .LookupAction (key )
384
385
if action == nil {
385
386
return nil , core .NewError (core .NOT_FOUND , "action %q not found" , key )
@@ -391,6 +392,17 @@ func runAction(ctx context.Context, reg *registry.Registry, key string, input js
391
392
var traceID string
392
393
output , err := tracing .RunInNewSpan (ctx , reg .TracingState (), "dev-run-action-wrapper" , "" , true , input , func (ctx context.Context , input json.RawMessage ) (json.RawMessage , error ) {
393
394
tracing .SetCustomMetadataAttr (ctx , "genkit-dev-internal" , "true" )
395
+ // Set telemetry labels from payload to span
396
+ if telemetryLabels != nil {
397
+ var telemetryAttributes map [string ]string
398
+ err := json .Unmarshal (telemetryLabels , & telemetryAttributes )
399
+ if err != nil {
400
+ return nil , core .NewError (core .INTERNAL , "Error unmarshalling telemetryLabels: %v" , err )
401
+ }
402
+ for k , v := range telemetryAttributes {
403
+ tracing .SetCustomMetadataAttr (ctx , k , v )
404
+ }
405
+ }
394
406
traceID = trace .SpanContextFromContext (ctx ).TraceID ().String ()
395
407
return action .RunJSON (ctx , input , cb )
396
408
})
0 commit comments