Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/atenet/internal/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewRouterCmd() *cobra.Command {
cmd.Flags().DurationVar(&cfg.HealthInterval, "health-interval", 1*time.Second, "Interval for checking health of dependent services")
cmd.Flags().IntVar(&cfg.HttpsPort, "port-https", 8443, "TCP port for HTTPS workload traffic entering through the Envoy Router")
cmd.Flags().StringVar(&cfg.EnvoyCertPath, "envoy-cert-path", "", "Path to the Envoy certificate file (if empty, a self-signed cert will be generated for testing)")
cmd.Flags().StringVar(&cfg.OtlpCollectorAddress, "otlp-collector-address", "", "host:port of the OTLP gRPC collector that Envoy reports tracing spans to (empty disables Envoy tracing)")

return cmd
}
16 changes: 15 additions & 1 deletion cmd/atenet/internal/router/extproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (

extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
envoy_type "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"google.golang.org/grpc"

"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
Expand All @@ -53,7 +56,9 @@ func NewExtProcServer(port int, apiClient ateapipb.ControlClient, routeDuration
}

func (s *ExtProcServer) Serve(ctx context.Context, lis net.Listener) error {
grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
extprocv3.RegisterExternalProcessorServer(grpcServer, s)

errChan := make(chan error, 1)
Expand Down Expand Up @@ -127,6 +132,15 @@ func (s *ExtProcServer) handleRequestHeaders(
reqHeaders *extprocv3.HttpHeaders,
) (*extprocv3.HeadersResponse, *requestMetadata, string, string, string, error) {
metadata := newRequestMetadata(reqHeaders.Headers.GetHeaders())

// Envoy doesn't propagate trace context into the ext_proc gRPC
// stream's metadata — the per-request traceparent arrives in the
// HTTP headers carried inside the ProcessingRequest payload. Extract
// from there so our span links to the Envoy ingress span.
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(metadata.headers))
ctx, span := otel.Tracer(routerServiceName).Start(ctx, "ExtProc.RequestHeaders")
defer span.End()

slog.InfoContext(ctx, "Request", slog.String("metadata", metadata.String()))

actorID, err := parseActorID(metadata.host)
Expand Down
21 changes: 12 additions & 9 deletions cmd/atenet/internal/router/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"k8s.io/client-go/kubernetes"
)

Expand All @@ -50,21 +51,23 @@ type routerHealth struct {

report RouterHealthReport

interval time.Duration
clientset kubernetes.Interface
apiClient ateapipb.ControlClient
cfg RouterConfig
interval time.Duration
clientset kubernetes.Interface
apiClient ateapipb.ControlClient
cfg RouterConfig
envoyClient *http.Client
}

func newRouterHealth(interval time.Duration, clientset kubernetes.Interface, apiClient ateapipb.ControlClient, cfg RouterConfig) *routerHealth {
if interval <= 0 {
interval = time.Second
}
return &routerHealth{
interval: interval,
clientset: clientset,
apiClient: apiClient,
cfg: cfg,
interval: interval,
clientset: clientset,
apiClient: apiClient,
cfg: cfg,
envoyClient: &http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)},
}
}

Expand Down Expand Up @@ -152,7 +155,7 @@ func (rh *routerHealth) checkEnvoy(ctx context.Context) (bool, string) {
return false, err.Error()
}

resp, err := http.DefaultClient.Do(req)
resp, err := rh.envoyClient.Do(req)
if err != nil {
return false, err.Error()
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/atenet/internal/router/resumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"time"

"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -40,6 +43,10 @@ func NewActorResumer(apiClient ateapipb.ControlClient) *ActorResumer {
// ResumeActor ensures the requested actor is running. It deduplicates concurrent
// requests within the process and retries when needed.
func (r *ActorResumer) ResumeActor(ctx context.Context, actorID string) (*ateapipb.Actor, error) {
ctx, span := otel.Tracer(routerServiceName).Start(ctx, "ResumeActor",
trace.WithAttributes(attribute.String("actor_id", actorID)))
defer span.End()

ch := r.flight.DoChan(actorID, func() (interface{}, error) {
// We detach the context from the first caller using a fixed background timeout.
// This guarantees that if Caller 1 disconnects or times out, the underlying
Expand Down
44 changes: 33 additions & 11 deletions cmd/atenet/internal/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import (
"time"

"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -78,6 +81,9 @@ type RouterConfig struct {
EnvoyCertPath string
LogLevel string
MetricsAddr string
// OtlpCollectorAddress is the host:port of the OTLP gRPC collector that
// Envoy reports tracing spans to. Empty disables Envoy-side tracing.
OtlpCollectorAddress string
}

// RouterServer instantiates and coordinates runtime threads executing system modules.
Expand All @@ -96,7 +102,6 @@ type RouterServer struct {
func NewRouterServer(cfg RouterConfig) (*RouterServer, error) {
var k8sClient client.Client
var clientset kubernetes.Interface
var err error

if cfg.TemplatesFile == "" {
k8sCfg, err := config.GetConfig()
Expand Down Expand Up @@ -125,14 +130,6 @@ func NewRouterServer(cfg RouterConfig) (*RouterServer, error) {
}
}

conn, err := grpc.NewClient(cfg.AteapiAddr, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})))
if err != nil {
return nil, fmt.Errorf("failed to establish grpc channel to ateapi client: %w", err)
}
slog.Info("Connecting to ateapi", slog.String("address", cfg.AteapiAddr))

apiClient := ateapipb.NewControlClient(conn)

var store atStore
if cfg.TemplatesFile != "" {
store = newFileATStore(cfg.TemplatesFile)
Expand All @@ -144,7 +141,6 @@ func NewRouterServer(cfg RouterConfig) (*RouterServer, error) {
cfg: cfg,
k8sClient: k8sClient,
clientset: clientset,
apiClient: apiClient,
atStore: store,
}, nil
}
Expand Down Expand Up @@ -173,6 +169,18 @@ func (s *RouterServer) Run(ctx context.Context) error {
}
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level})))

// Tracing must be initialized before constructing the ateapi gRPC client
// below, because otelgrpc.NewClientHandler captures the global
// TracerProvider at construction time.
tp, err := serverboot.InitTracing(ctx, serverboot.TracingOptions{
ServiceName: routerServiceName,
Sampler: sdktrace.ParentBased(sdktrace.NeverSample()),
})
if err != nil {
return fmt.Errorf("failed to initialize tracing: %w", err)
}
defer serverboot.ShutdownProvider("TracerProvider", tp.Shutdown)

mp, err := serverboot.InitMetrics(ctx, routerServiceName)
if err != nil {
return fmt.Errorf("failed to initialize metrics: %w", err)
Expand All @@ -181,12 +189,26 @@ func (s *RouterServer) Run(ctx context.Context) error {

go serverboot.StartMetricsServer(ctx, serverboot.MetricsServerOptions{Addr: s.cfg.MetricsAddr})

conn, err := grpc.NewClient(
s.cfg.AteapiAddr,
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
if err != nil {
return fmt.Errorf("failed to establish grpc channel to ateapi client: %w", err)
}
slog.InfoContext(ctx, "Connecting to ateapi", slog.String("address", s.cfg.AteapiAddr))
s.apiClient = ateapipb.NewControlClient(conn)

slog.InfoContext(ctx, "Starting substrate router subsystem", slog.Bool("standalone", s.cfg.Standalone))

g, ctx := errgroup.WithContext(ctx)

xdsSrv := NewXdsServer(s.cfg.XdsPort)
xdsSrv.SetConfig(s.cfg.HttpPort, s.cfg.ExtprocPort, s.cfg.ExtprocAddr)
if err := xdsSrv.SetOtlpCollector(s.cfg.OtlpCollectorAddress); err != nil {
return fmt.Errorf("configure OTLP collector: %w", err)
}

var certContent, keyContent string
if s.cfg.EnvoyCertPath == "" {
Expand Down Expand Up @@ -258,7 +280,7 @@ func (s *RouterServer) Run(ctx context.Context) error {
mux.HandleFunc("/statusz", s.handleStatusz)

httpServer := &http.Server{
Handler: mux,
Handler: otelhttp.NewHandler(mux, "/"),
}

g.Go(func() error {
Expand Down
6 changes: 5 additions & 1 deletion cmd/atenet/internal/router/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/spf13/pflag"
"go.opentelemetry.io/otel"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -171,7 +172,10 @@ func (s *RouterServer) getRouterIP(ctx context.Context) string {
}

func (s *RouterServer) handleStatusz(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithTimeout(req.Context(), 3*time.Second)
ctx, span := otel.Tracer(routerServiceName).Start(req.Context(), "handleStatusz")
defer span.End()

ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

routerIP := s.getRouterIP(ctx)
Expand Down
Loading
Loading