From 8372e1627971c93ff57f37ef575b006d8e7cffff Mon Sep 17 00:00:00 2001 From: Max Smythe Date: Wed, 17 Jun 2026 23:17:33 +0000 Subject: [PATCH] add otel tracing to atenet --- cmd/atenet/internal/router.go | 1 + cmd/atenet/internal/router/extproc.go | 16 ++- cmd/atenet/internal/router/health.go | 21 ++-- cmd/atenet/internal/router/resumer.go | 7 ++ cmd/atenet/internal/router/router.go | 44 ++++++-- cmd/atenet/internal/router/status.go | 6 +- cmd/atenet/internal/router/xds.go | 125 ++++++++++++++++++++++- go.mod | 2 +- manifests/ate-install/atenet-router.yaml | 1 + 9 files changed, 199 insertions(+), 24 deletions(-) diff --git a/cmd/atenet/internal/router.go b/cmd/atenet/internal/router.go index 5cff97233..8592dc4c8 100644 --- a/cmd/atenet/internal/router.go +++ b/cmd/atenet/internal/router.go @@ -56,6 +56,7 @@ func NewRouterCmd() *cobra.Command { cmd.Flags().DurationVar(&cfg.HealthInterval, "health-interval", 1*time.Second, "Interval for checking health of dependent services") cmd.Flags().IntVar(&cfg.HttpsPort, "port-https", 8443, "TCP port for HTTPS workload traffic entering through the Envoy Router") cmd.Flags().StringVar(&cfg.EnvoyCertPath, "envoy-cert-path", "", "Path to the Envoy certificate file (if empty, a self-signed cert will be generated for testing)") + cmd.Flags().StringVar(&cfg.OtlpCollectorAddress, "otlp-collector-address", "", "host:port of the OTLP gRPC collector that Envoy reports tracing spans to (empty disables Envoy tracing)") return cmd } diff --git a/cmd/atenet/internal/router/extproc.go b/cmd/atenet/internal/router/extproc.go index 7c8c184e2..fee6d6fae 100644 --- a/cmd/atenet/internal/router/extproc.go +++ b/cmd/atenet/internal/router/extproc.go @@ -25,8 +25,11 @@ import ( extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" envoy_type "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" "google.golang.org/grpc" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" @@ -53,7 +56,9 @@ func NewExtProcServer(port int, apiClient ateapipb.ControlClient, routeDuration } func (s *ExtProcServer) Serve(ctx context.Context, lis net.Listener) error { - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer( + grpc.StatsHandler(otelgrpc.NewServerHandler()), + ) extprocv3.RegisterExternalProcessorServer(grpcServer, s) errChan := make(chan error, 1) @@ -127,6 +132,15 @@ func (s *ExtProcServer) handleRequestHeaders( reqHeaders *extprocv3.HttpHeaders, ) (*extprocv3.HeadersResponse, *requestMetadata, string, string, string, error) { metadata := newRequestMetadata(reqHeaders.Headers.GetHeaders()) + + // Envoy doesn't propagate trace context into the ext_proc gRPC + // stream's metadata — the per-request traceparent arrives in the + // HTTP headers carried inside the ProcessingRequest payload. Extract + // from there so our span links to the Envoy ingress span. + ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(metadata.headers)) + ctx, span := otel.Tracer(routerServiceName).Start(ctx, "ExtProc.RequestHeaders") + defer span.End() + slog.InfoContext(ctx, "Request", slog.String("metadata", metadata.String())) actorID, err := parseActorID(metadata.host) diff --git a/cmd/atenet/internal/router/health.go b/cmd/atenet/internal/router/health.go index 62ae1ae02..d9e1fddab 100644 --- a/cmd/atenet/internal/router/health.go +++ b/cmd/atenet/internal/router/health.go @@ -25,6 +25,7 @@ import ( "time" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "k8s.io/client-go/kubernetes" ) @@ -50,10 +51,11 @@ type routerHealth struct { report RouterHealthReport - interval time.Duration - clientset kubernetes.Interface - apiClient ateapipb.ControlClient - cfg RouterConfig + interval time.Duration + clientset kubernetes.Interface + apiClient ateapipb.ControlClient + cfg RouterConfig + envoyClient *http.Client } func newRouterHealth(interval time.Duration, clientset kubernetes.Interface, apiClient ateapipb.ControlClient, cfg RouterConfig) *routerHealth { @@ -61,10 +63,11 @@ func newRouterHealth(interval time.Duration, clientset kubernetes.Interface, api interval = time.Second } return &routerHealth{ - interval: interval, - clientset: clientset, - apiClient: apiClient, - cfg: cfg, + interval: interval, + clientset: clientset, + apiClient: apiClient, + cfg: cfg, + envoyClient: &http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}, } } @@ -152,7 +155,7 @@ func (rh *routerHealth) checkEnvoy(ctx context.Context) (bool, string) { return false, err.Error() } - resp, err := http.DefaultClient.Do(req) + resp, err := rh.envoyClient.Do(req) if err != nil { return false, err.Error() } diff --git a/cmd/atenet/internal/router/resumer.go b/cmd/atenet/internal/router/resumer.go index 2f9a420f9..5ed2ad040 100644 --- a/cmd/atenet/internal/router/resumer.go +++ b/cmd/atenet/internal/router/resumer.go @@ -19,6 +19,9 @@ import ( "time" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/singleflight" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -40,6 +43,10 @@ func NewActorResumer(apiClient ateapipb.ControlClient) *ActorResumer { // ResumeActor ensures the requested actor is running. It deduplicates concurrent // requests within the process and retries when needed. func (r *ActorResumer) ResumeActor(ctx context.Context, actorID string) (*ateapipb.Actor, error) { + ctx, span := otel.Tracer(routerServiceName).Start(ctx, "ResumeActor", + trace.WithAttributes(attribute.String("actor_id", actorID))) + defer span.End() + ch := r.flight.DoChan(actorID, func() (interface{}, error) { // We detach the context from the first caller using a fixed background timeout. // This guarantees that if Caller 1 disconnects or times out, the underlying diff --git a/cmd/atenet/internal/router/router.go b/cmd/atenet/internal/router/router.go index 797ce36c0..98f7f9c43 100644 --- a/cmd/atenet/internal/router/router.go +++ b/cmd/atenet/internal/router/router.go @@ -35,6 +35,9 @@ import ( "time" "github.com/spf13/cobra" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -78,6 +81,9 @@ type RouterConfig struct { EnvoyCertPath string LogLevel string MetricsAddr string + // OtlpCollectorAddress is the host:port of the OTLP gRPC collector that + // Envoy reports tracing spans to. Empty disables Envoy-side tracing. + OtlpCollectorAddress string } // RouterServer instantiates and coordinates runtime threads executing system modules. @@ -96,7 +102,6 @@ type RouterServer struct { func NewRouterServer(cfg RouterConfig) (*RouterServer, error) { var k8sClient client.Client var clientset kubernetes.Interface - var err error if cfg.TemplatesFile == "" { k8sCfg, err := config.GetConfig() @@ -125,14 +130,6 @@ func NewRouterServer(cfg RouterConfig) (*RouterServer, error) { } } - conn, err := grpc.NewClient(cfg.AteapiAddr, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))) - if err != nil { - return nil, fmt.Errorf("failed to establish grpc channel to ateapi client: %w", err) - } - slog.Info("Connecting to ateapi", slog.String("address", cfg.AteapiAddr)) - - apiClient := ateapipb.NewControlClient(conn) - var store atStore if cfg.TemplatesFile != "" { store = newFileATStore(cfg.TemplatesFile) @@ -144,7 +141,6 @@ func NewRouterServer(cfg RouterConfig) (*RouterServer, error) { cfg: cfg, k8sClient: k8sClient, clientset: clientset, - apiClient: apiClient, atStore: store, }, nil } @@ -173,6 +169,18 @@ func (s *RouterServer) Run(ctx context.Context) error { } slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level}))) + // Tracing must be initialized before constructing the ateapi gRPC client + // below, because otelgrpc.NewClientHandler captures the global + // TracerProvider at construction time. + tp, err := serverboot.InitTracing(ctx, serverboot.TracingOptions{ + ServiceName: routerServiceName, + Sampler: sdktrace.ParentBased(sdktrace.NeverSample()), + }) + if err != nil { + return fmt.Errorf("failed to initialize tracing: %w", err) + } + defer serverboot.ShutdownProvider("TracerProvider", tp.Shutdown) + mp, err := serverboot.InitMetrics(ctx, routerServiceName) if err != nil { return fmt.Errorf("failed to initialize metrics: %w", err) @@ -181,12 +189,26 @@ func (s *RouterServer) Run(ctx context.Context) error { go serverboot.StartMetricsServer(ctx, serverboot.MetricsServerOptions{Addr: s.cfg.MetricsAddr}) + conn, err := grpc.NewClient( + s.cfg.AteapiAddr, + grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + ) + if err != nil { + return fmt.Errorf("failed to establish grpc channel to ateapi client: %w", err) + } + slog.InfoContext(ctx, "Connecting to ateapi", slog.String("address", s.cfg.AteapiAddr)) + s.apiClient = ateapipb.NewControlClient(conn) + slog.InfoContext(ctx, "Starting substrate router subsystem", slog.Bool("standalone", s.cfg.Standalone)) g, ctx := errgroup.WithContext(ctx) xdsSrv := NewXdsServer(s.cfg.XdsPort) xdsSrv.SetConfig(s.cfg.HttpPort, s.cfg.ExtprocPort, s.cfg.ExtprocAddr) + if err := xdsSrv.SetOtlpCollector(s.cfg.OtlpCollectorAddress); err != nil { + return fmt.Errorf("configure OTLP collector: %w", err) + } var certContent, keyContent string if s.cfg.EnvoyCertPath == "" { @@ -258,7 +280,7 @@ func (s *RouterServer) Run(ctx context.Context) error { mux.HandleFunc("/statusz", s.handleStatusz) httpServer := &http.Server{ - Handler: mux, + Handler: otelhttp.NewHandler(mux, "/"), } g.Go(func() error { diff --git a/cmd/atenet/internal/router/status.go b/cmd/atenet/internal/router/status.go index 3d5f2dc74..d21081e8a 100644 --- a/cmd/atenet/internal/router/status.go +++ b/cmd/atenet/internal/router/status.go @@ -29,6 +29,7 @@ import ( "time" "github.com/spf13/pflag" + "go.opentelemetry.io/otel" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -171,7 +172,10 @@ func (s *RouterServer) getRouterIP(ctx context.Context) string { } func (s *RouterServer) handleStatusz(w http.ResponseWriter, req *http.Request) { - ctx, cancel := context.WithTimeout(req.Context(), 3*time.Second) + ctx, span := otel.Tracer(routerServiceName).Start(req.Context(), "handleStatusz") + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() routerIP := s.getRouterIP(ctx) diff --git a/cmd/atenet/internal/router/xds.go b/cmd/atenet/internal/router/xds.go index 964fc5e92..e7e1602a9 100644 --- a/cmd/atenet/internal/router/xds.go +++ b/cmd/atenet/internal/router/xds.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" @@ -35,6 +36,7 @@ import ( endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + tracev3 "github.com/envoyproxy/go-control-plane/envoy/config/trace/v3" streamaccesslogv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/stream/v3" dfpclusterv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/dynamic_forward_proxy/v3" dfpcommonv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/common/dynamic_forward_proxy/v3" @@ -50,6 +52,7 @@ import ( endpointgrpc "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" listenergrpc "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" routegrpc "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" + typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" @@ -62,6 +65,7 @@ const ( IngressHTTPSListener = "ingress_https_listener" RouteName = "substrate_routes" ClusterName = "ate-cluster" + OtlpClusterName = "otel_collector_cluster" ) // XdsServer implements an aggregated discovery service server for dynamic Envoy router nodes. @@ -80,6 +84,9 @@ type XdsServer struct { certPath string certContent string keyContent string + + otlpHost string + otlpPort uint32 } func NewXdsServer(xdsPort int) *XdsServer { @@ -113,6 +120,31 @@ func (x *XdsServer) SetTlsConfig(httpsPort int, certPath string, certContent str x.keyContent = keyContent } +// SetOtlpCollector enables Envoy-side tracing pointed at the OTLP gRPC +// collector at host:port. addr empty disables tracing. port defaults to +// 4317 if omitted. +func (x *XdsServer) SetOtlpCollector(addr string) error { + x.mu.Lock() + defer x.mu.Unlock() + if addr == "" { + x.otlpHost = "" + x.otlpPort = 0 + return nil + } + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + host = addr + portStr = "4317" + } + port, err := strconv.ParseUint(portStr, 10, 32) + if err != nil { + return fmt.Errorf("parse OTLP collector port from %q: %w", addr, err) + } + x.otlpHost = host + x.otlpPort = uint32(port) + return nil +} + func (x *XdsServer) UpdateSnapshot() error { x.mu.Lock() defer x.mu.Unlock() @@ -125,6 +157,9 @@ func (x *XdsServer) UpdateSnapshot() error { x.buildCluster(), x.buildDynamicForwardProxyCluster(), } + if x.otlpHost != "" { + clusters = append(clusters, x.buildOtlpCollectorCluster()) + } // Routes routes := []types.Resource{ @@ -164,7 +199,9 @@ func (x *XdsServer) Serve(ctx context.Context, lis net.Listener) error { slog.ErrorContext(ctx, "Warning - initial xDS setup update failed", slog.String("err", err.Error())) } - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer( + grpc.StatsHandler(otelgrpc.NewServerHandler()), + ) discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, x.srv) clustergrpc.RegisterClusterDiscoveryServiceServer(grpcServer, x.srv) endpointgrpc.RegisterEndpointDiscoveryServiceServer(grpcServer, x.srv) @@ -246,6 +283,56 @@ func buildDnsCacheConfig() *dfpcommonv3.DnsCacheConfig { } } +// buildOtlpCollectorCluster builds a STRICT_DNS HTTP/2 cluster that +// targets the OTLP gRPC collector. Required when HCM tracing is enabled +// so Envoy has somewhere to ship spans. +func (x *XdsServer) buildOtlpCollectorCluster() *clusterv3.Cluster { + h2Opts, _ := anypb.New(&httpv3.HttpProtocolOptions{ + UpstreamProtocolOptions: &httpv3.HttpProtocolOptions_ExplicitHttpConfig_{ + ExplicitHttpConfig: &httpv3.HttpProtocolOptions_ExplicitHttpConfig{ + ProtocolConfig: &httpv3.HttpProtocolOptions_ExplicitHttpConfig_Http2ProtocolOptions{}, + }, + }, + }) + + return &clusterv3.Cluster{ + Name: OtlpClusterName, + ConnectTimeout: durationpb.New(1 * time.Second), + ClusterDiscoveryType: &clusterv3.Cluster_Type{ + Type: clusterv3.Cluster_STRICT_DNS, + }, + LbPolicy: clusterv3.Cluster_ROUND_ROBIN, + LoadAssignment: &endpointv3.ClusterLoadAssignment{ + ClusterName: OtlpClusterName, + Endpoints: []*endpointv3.LocalityLbEndpoints{ + { + LbEndpoints: []*endpointv3.LbEndpoint{ + { + HostIdentifier: &endpointv3.LbEndpoint_Endpoint{ + Endpoint: &endpointv3.Endpoint{ + Address: &corev3.Address{ + Address: &corev3.Address_SocketAddress{ + SocketAddress: &corev3.SocketAddress{ + Address: x.otlpHost, + PortSpecifier: &corev3.SocketAddress_PortValue{ + PortValue: x.otlpPort, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + TypedExtensionProtocolOptions: map[string]*anypb.Any{ + "envoy.extensions.upstreams.http.v3.HttpProtocolOptions": h2Opts, + }, + } +} + func (x *XdsServer) buildDynamicForwardProxyCluster() *clusterv3.Cluster { dfpClusterConfig := &dfpclusterv3.ClusterConfig{ ClusterImplementationSpecifier: &dfpclusterv3.ClusterConfig_DnsCacheConfig{ @@ -334,6 +421,7 @@ func (x *XdsServer) buildHcm(statPrefix string) *anypb.Any { hcm, _ := anypb.New(&hcmv3.HttpConnectionManager{ StatPrefix: statPrefix, GenerateRequestId: &wrapperspb.BoolValue{Value: true}, + Tracing: x.buildTracing(), AccessLog: []*accesslogv3.AccessLog{ { Name: "envoy.access_loggers.stdout", @@ -377,6 +465,41 @@ func (x *XdsServer) buildHcm(statPrefix string) *anypb.Any { return hcm } +// buildTracing returns the HCM Tracing block that points Envoy at the +// configured OTLP gRPC collector. Returns nil when no collector is set, +// in which case Envoy emits no spans on its own. +// +// `RandomSampling: 100%` makes Envoy ALWAYS sample requests that arrive +// with no parent traceparent. We rely on upstream clients (locust, etc.) +// to gate sampling: requests without a sampled parent are still tagged +// `sampled` here but downstream services in this repo use +// `ParentBased(NeverSample)` so unsampled-by-client requests stay +// unsampled overall. +func (x *XdsServer) buildTracing() *hcmv3.HttpConnectionManager_Tracing { + if x.otlpHost == "" { + return nil + } + otelConfig, _ := anypb.New(&tracev3.OpenTelemetryConfig{ + GrpcService: &corev3.GrpcService{ + TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{ + ClusterName: OtlpClusterName, + }, + }, + }, + ServiceName: "atenet-router-envoy", + }) + return &hcmv3.HttpConnectionManager_Tracing{ + RandomSampling: &typev3.Percent{Value: 100}, + Provider: &tracev3.Tracing_Http{ + Name: "envoy.tracers.opentelemetry", + ConfigType: &tracev3.Tracing_Http_TypedConfig{ + TypedConfig: otelConfig, + }, + }, + } +} + func (x *XdsServer) buildListener() *listenerv3.Listener { hcm := x.buildHcm("ingress_http") diff --git a/go.mod b/go.mod index 16b41313f..d53c76777 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/vishvananda/netlink v1.3.1 github.com/vishvananda/netns v0.0.5 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.43.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 @@ -142,7 +143,6 @@ require ( github.com/yuin/gopher-lua v1.1.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.42.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect go.opentelemetry.io/proto/otlp v1.10.0 // indirect go.uber.org/atomic v1.11.0 // indirect diff --git a/manifests/ate-install/atenet-router.yaml b/manifests/ate-install/atenet-router.yaml index 43f69ab5e..e4ebd1ecc 100644 --- a/manifests/ate-install/atenet-router.yaml +++ b/manifests/ate-install/atenet-router.yaml @@ -135,6 +135,7 @@ spec: - "--status-port=4040" - "--port-https=8443" - "--envoy-cert-path=/run/servicedns.podcert.ate.dev/credential-bundle.pem" + - "--otlp-collector-address=opentelemetry-collector.gke-managed-otel.svc.cluster.local:4317" env: - name: POD_NAME valueFrom: