diff --git a/cmd/galactic-agent/main.go b/cmd/galactic-agent/main.go index 62b6dc3..e0c4d16 100644 --- a/cmd/galactic-agent/main.go +++ b/cmd/galactic-agent/main.go @@ -30,19 +30,13 @@ func newRootCommand() *cobra.Command { }, } - cmd.Flags().StringVar(&opts.MetricsAddr, "metrics-addr", ":8082", "Address to serve Prometheus metrics on") - cmd.Flags().StringVar(&opts.HealthAddr, "health-addr", ":8083", "Address to serve health/readiness probes on") cmd.Flags().StringVar(&opts.NodeName, "node-name", "", "Override node name (default: NODE_NAME env var)") - cmd.Flags().StringVar(&opts.Plane, "plane", "overlay", - "BGP plane label published on the BGPProvider (e.g. overlay, overlay-rr)") - cmd.Flags().BoolVar(&opts.GoBGPEnabled, "gobgp-enabled", false, - "Enable embedded GoBGP and publish BGPProvider") - cmd.Flags().IntVar(&opts.GoBGPAPIPort, "gobgp-api-port", 50051, - "Port for the embedded GoBGP gRPC API (cosmos dials this)") - cmd.Flags().StringVar(&opts.GoBGPLogLevel, "gobgp-log-level", "panic", - "GoBGP internal log level (debug, info, warn, error, panic)") - cmd.Flags().IntVar(&opts.GRPCHealthPort, "grpc-health-port", 8084, - "Port for the gRPC health service (used by Kubernetes readiness probes)") + cmd.Flags().StringVar(&opts.Role, "role", "overlay", + "Agent role published on the BGPProvider label galactic.io/role (overlay, overlay-rr)") + cmd.Flags().IntVar(&opts.Port, "port", 33438, + "Port for the gRPC server that cosmos uses to configure the BGP provider") + cmd.Flags().IntVar(&opts.HealthPort, "health-port", 5000, + "Port for the gRPC health server (Kubernetes liveness and readiness probes)") return cmd } diff --git a/deploy/containerlab/examples/nginx-vpc-test.yaml b/deploy/containerlab/examples/nginx-vpc-test.yaml deleted file mode 100644 index 17784bb..0000000 --- a/deploy/containerlab/examples/nginx-vpc-test.yaml +++ /dev/null @@ -1,91 +0,0 @@ -# Deploy an nginx pod attached to a VPC named "test". -# -# Apply order matters: VPC → VPCAttachment → NetworkAttachmentDefinition → Deployment. -# -# The "vpc" and "vpcattachment" fields in the NAD are Base62-encoded identifiers. -# When the cosmos VPC operator is running it generates and writes these automatically. -# Here they are set statically: vpc "2" (hex 000000000002) and vpcattachment "1". -# -# Prerequisites: -# - Multus CNI installed on the cluster -# - Galactic CNI plugin installed on nodes -# - VPC/VPCAttachment CRDs installed (vpc.miloapis.com) ---- -apiVersion: vpc.miloapis.com/v1alpha1 -kind: VPC -metadata: - name: test - namespace: default -spec: - networks: - - "10.2.0.0/16" ---- -apiVersion: vpc.miloapis.com/v1alpha1 -kind: VPCAttachment -metadata: - name: nginx-test - namespace: default -spec: - vpc: - name: test - interface: - name: galactic0 - addresses: - - "10.2.0.10/16" ---- -apiVersion: k8s.cni.cncf.io/v1 -kind: NetworkAttachmentDefinition -metadata: - name: galactic-test - namespace: default -spec: - config: | - { - "cniVersion": "0.3.1", - "name": "galactic-test", - "type": "galactic-cni", - "vpc": "2", - "vpcattachment": "1", - "srv6_locator": "2001:db8:ff01::/48", - "gobgp": { - "address": "127.0.0.1:50051" - }, - "ipam": { - "type": "static", - "addresses": [ - { "address": "10.2.0.10/16" } - ] - } - } ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: nginx-test - namespace: default - labels: - app: nginx-test -spec: - replicas: 1 - selector: - matchLabels: - app: nginx-test - template: - metadata: - labels: - app: nginx-test - annotations: - k8s.v1.cni.cncf.io/networks: galactic-test - spec: - containers: - - name: nginx - image: nginx:stable - ports: - - containerPort: 80 - protocol: TCP - resources: - requests: - cpu: 50m - memory: 64Mi - limits: - memory: 128Mi diff --git a/deploy/containerlab/resources/bgp/dfw/bgpinstance.yaml b/deploy/containerlab/resources/bgp/dfw/bgpinstance.yaml index 20aea1d..4a44ac2 100644 --- a/deploy/containerlab/resources/bgp/dfw/bgpinstance.yaml +++ b/deploy/containerlab/resources/bgp/dfw/bgpinstance.yaml @@ -5,7 +5,7 @@ metadata: spec: providerSelector: matchLabels: - galactic.io/plane: overlay + galactic.io/role: overlay asNumber: 65000 listenPort: 1790 routerIDSource: Auto diff --git a/deploy/containerlab/resources/bgp/dfw/bgppeer.yaml b/deploy/containerlab/resources/bgp/dfw/bgppeer.yaml index b5c7cca..d22f65a 100644 --- a/deploy/containerlab/resources/bgp/dfw/bgppeer.yaml +++ b/deploy/containerlab/resources/bgp/dfw/bgppeer.yaml @@ -6,7 +6,7 @@ spec: instanceRef: overlay providerSelector: matchLabels: - galactic.io/plane: overlay + galactic.io/role: overlay address: "fc00:0:8::1" asNumber: 65000 remotePort: 1179 diff --git a/deploy/containerlab/resources/bgp/iad/bgpinstance-rr.yaml b/deploy/containerlab/resources/bgp/iad/bgpinstance-rr.yaml index 9307cb9..8ccc8f1 100644 --- a/deploy/containerlab/resources/bgp/iad/bgpinstance-rr.yaml +++ b/deploy/containerlab/resources/bgp/iad/bgpinstance-rr.yaml @@ -5,7 +5,7 @@ metadata: spec: providerSelector: matchLabels: - galactic.io/plane: overlay-rr + galactic.io/role: overlay-rr asNumber: 65000 routerIDSource: Manual routerID: "10.255.255.4" diff --git a/deploy/containerlab/resources/bgp/iad/bgpinstance.yaml b/deploy/containerlab/resources/bgp/iad/bgpinstance.yaml index 20aea1d..4a44ac2 100644 --- a/deploy/containerlab/resources/bgp/iad/bgpinstance.yaml +++ b/deploy/containerlab/resources/bgp/iad/bgpinstance.yaml @@ -5,7 +5,7 @@ metadata: spec: providerSelector: matchLabels: - galactic.io/plane: overlay + galactic.io/role: overlay asNumber: 65000 listenPort: 1790 routerIDSource: Auto diff --git a/deploy/containerlab/resources/bgp/iad/bgppeer-rr.yaml b/deploy/containerlab/resources/bgp/iad/bgppeer-rr.yaml index 0d7bdc8..973515b 100644 --- a/deploy/containerlab/resources/bgp/iad/bgppeer-rr.yaml +++ b/deploy/containerlab/resources/bgp/iad/bgppeer-rr.yaml @@ -6,7 +6,7 @@ spec: instanceRef: overlay-rr providerSelector: matchLabels: - galactic.io/plane: overlay-rr + galactic.io/role: overlay-rr address: "fc00:0:2::1" asNumber: 65000 passive: true @@ -23,7 +23,7 @@ spec: instanceRef: overlay-rr providerSelector: matchLabels: - galactic.io/plane: overlay-rr + galactic.io/role: overlay-rr address: "fc00:0:3::1" asNumber: 65000 passive: true @@ -40,7 +40,7 @@ spec: instanceRef: overlay-rr providerSelector: matchLabels: - galactic.io/plane: overlay-rr + galactic.io/role: overlay-rr address: "fc00:0:4::1" asNumber: 65000 passive: true diff --git a/deploy/containerlab/resources/bgp/iad/bgppeer.yaml b/deploy/containerlab/resources/bgp/iad/bgppeer.yaml index b5c7cca..d22f65a 100644 --- a/deploy/containerlab/resources/bgp/iad/bgppeer.yaml +++ b/deploy/containerlab/resources/bgp/iad/bgppeer.yaml @@ -6,7 +6,7 @@ spec: instanceRef: overlay providerSelector: matchLabels: - galactic.io/plane: overlay + galactic.io/role: overlay address: "fc00:0:8::1" asNumber: 65000 remotePort: 1179 diff --git a/deploy/containerlab/resources/bgp/sjc/bgpinstance.yaml b/deploy/containerlab/resources/bgp/sjc/bgpinstance.yaml index 20aea1d..4a44ac2 100644 --- a/deploy/containerlab/resources/bgp/sjc/bgpinstance.yaml +++ b/deploy/containerlab/resources/bgp/sjc/bgpinstance.yaml @@ -5,7 +5,7 @@ metadata: spec: providerSelector: matchLabels: - galactic.io/plane: overlay + galactic.io/role: overlay asNumber: 65000 listenPort: 1790 routerIDSource: Auto diff --git a/deploy/containerlab/resources/bgp/sjc/bgppeer.yaml b/deploy/containerlab/resources/bgp/sjc/bgppeer.yaml index b5c7cca..d22f65a 100644 --- a/deploy/containerlab/resources/bgp/sjc/bgppeer.yaml +++ b/deploy/containerlab/resources/bgp/sjc/bgppeer.yaml @@ -6,7 +6,7 @@ spec: instanceRef: overlay providerSelector: matchLabels: - galactic.io/plane: overlay + galactic.io/role: overlay address: "fc00:0:8::1" asNumber: 65000 remotePort: 1179 diff --git a/deploy/containerlab/resources/cosmos/daemonset-patch.yaml b/deploy/containerlab/resources/cosmos/daemonset-patch.yaml new file mode 100644 index 0000000..f8126ad --- /dev/null +++ b/deploy/containerlab/resources/cosmos/daemonset-patch.yaml @@ -0,0 +1,11 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: cosmos + namespace: bgp-system +spec: + template: + spec: + containers: + - name: cosmos-operator + imagePullPolicy: Never diff --git a/deploy/containerlab/resources/cosmos/kustomization.yaml b/deploy/containerlab/resources/cosmos/kustomization.yaml new file mode 100644 index 0000000..20baf18 --- /dev/null +++ b/deploy/containerlab/resources/cosmos/kustomization.yaml @@ -0,0 +1,13 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: + - ../cosmos-config/deploy/ +images: + - name: ghcr.io/milo-os/cosmos + newName: cosmos + newTag: latest +patches: + - path: daemonset-patch.yaml + target: + kind: DaemonSet + name: cosmos diff --git a/deploy/containerlab/resources/overlay/base/daemonset.yaml b/deploy/containerlab/resources/overlay/base/daemonset.yaml index bc1782f..0e11d9d 100644 --- a/deploy/containerlab/resources/overlay/base/daemonset.yaml +++ b/deploy/containerlab/resources/overlay/base/daemonset.yaml @@ -26,13 +26,9 @@ spec: image: galactic-agent:latest imagePullPolicy: Never args: - - --gobgp-enabled=true - - --gobgp-api-port=50051 - - --gobgp-log-level=debug - - --metrics-addr=:8082 - - --health-addr=:8083 - - --grpc-health-port=8084 - - --plane=overlay + - --port=33438 + - --health-port=5000 + - --role=overlay env: - name: NODE_NAME valueFrom: @@ -43,14 +39,13 @@ spec: add: - NET_ADMIN livenessProbe: - httpGet: - path: /healthz - port: 8083 + grpc: + port: 5000 initialDelaySeconds: 15 periodSeconds: 20 readinessProbe: - httpGet: - path: /readyz - port: 8083 + grpc: + port: 5000 + service: readyz initialDelaySeconds: 5 periodSeconds: 10 diff --git a/deploy/containerlab/resources/overlay/iad/rr/daemonset-patch.yaml b/deploy/containerlab/resources/overlay/iad/rr/daemonset-patch.yaml index 4dc6773..411bb1b 100644 --- a/deploy/containerlab/resources/overlay/iad/rr/daemonset-patch.yaml +++ b/deploy/containerlab/resources/overlay/iad/rr/daemonset-patch.yaml @@ -27,18 +27,13 @@ spec: containers: - name: galactic-agent args: - - --gobgp-enabled=true - - --gobgp-api-port=50052 - - --gobgp-log-level=debug - - --metrics-addr=:9085 - - --health-addr=:9086 - - --grpc-health-port=9087 - - --plane=overlay-rr + - --port=33440 + - --health-port=5000 + - --role=overlay-rr livenessProbe: - httpGet: - path: /healthz - port: 9086 + grpc: + port: 5000 readinessProbe: - httpGet: - path: /readyz - port: 9086 + grpc: + port: 5000 + service: readyz diff --git a/deploy/containerlab/scripts/install-overlay.sh b/deploy/containerlab/scripts/install-overlay.sh index 5c60a75..c0b33b8 100755 --- a/deploy/containerlab/scripts/install-overlay.sh +++ b/deploy/containerlab/scripts/install-overlay.sh @@ -21,12 +21,9 @@ deploy_cosmos() { echo "Deploying cosmos operator to ${node}..." # Copy the entire config/ directory so that the kustomization's ../crd reference resolves docker cp "${COSMOS_DIR}/config" "${node}:/galactic/resources/cosmos-config/" - # Use locally built image; patch imagePullPolicy to Never - docker exec "${node}" kubectl apply -k /galactic/resources/cosmos-config/deploy/ - docker exec "${node}" kubectl patch daemonset bgp \ - -n bgp-system \ - --type='json' \ - -p='[{"op":"replace","path":"/spec/template/spec/containers/0/image","value":"cosmos:latest"},{"op":"replace","path":"/spec/template/spec/containers/0/imagePullPolicy","value":"Never"}]' + # Copy the local overlay (image + imagePullPolicy overrides for Kind) + docker cp "${RESOURCES_DIR}/cosmos" "${node}:/galactic/resources/cosmos-overlay/" + docker exec "${node}" kubectl apply -k /galactic/resources/cosmos-overlay/ } apply_overlay dfw-control-plane dfw @@ -37,10 +34,4 @@ deploy_cosmos dfw-control-plane deploy_cosmos sjc-control-plane deploy_cosmos iad-control-plane -# iad hosts the overlay-rr BGPInstance (route reflector), which requires infra role. -echo "Patching cosmos clusterRole to infra on iad-control-plane..." -docker exec iad-control-plane kubectl patch configmap cosmos-config \ - -n cosmos-system --type=merge -p '{"data":{"clusterRole":"infra"}}' -docker exec iad-control-plane kubectl rollout restart daemonset bgp -n bgp-system - echo "Done." diff --git a/deploy/galactic-agent/daemonset.yaml b/deploy/galactic-agent/daemonset.yaml index a2a5222..150b753 100644 --- a/deploy/galactic-agent/daemonset.yaml +++ b/deploy/galactic-agent/daemonset.yaml @@ -22,37 +22,29 @@ spec: - name: galactic-agent image: ghcr.io/datum-cloud/galactic:latest args: - - --gobgp-enabled=true - - --gobgp-api-port=50051 - - --gobgp-log-level=panic - - --metrics-addr=:8082 - - --health-addr=:8083 - - --grpc-health-port=8084 + - --port=33438 + - --health-port=5000 env: - name: NODE_NAME valueFrom: fieldRef: fieldPath: spec.nodeName ports: - - name: metrics - containerPort: 8082 + - name: grpc + containerPort: 33438 protocol: TCP - - name: healthz - containerPort: 8083 - protocol: TCP - - name: grpc-health - containerPort: 8084 + - name: health + containerPort: 5000 protocol: TCP livenessProbe: - httpGet: - path: /healthz - port: healthz + grpc: + port: 5000 initialDelaySeconds: 10 periodSeconds: 30 readinessProbe: - httpGet: - path: /readyz - port: healthz + grpc: + port: 5000 + service: readyz initialDelaySeconds: 5 periodSeconds: 10 securityContext: diff --git a/docs/agent-startup.md b/docs/agent-startup.md index 4d01f1d..d048659 100644 --- a/docs/agent-startup.md +++ b/docs/agent-startup.md @@ -6,12 +6,11 @@ sequenceDiagram participant GoBGP participant Kubernetes - Agent->>Agent: register Prometheus metrics - Agent->>Agent: start gRPC health server (NOT_SERVING) + Agent->>Agent: start health gRPC server (--health-port, liveness SERVING immediately) + Agent->>Agent: start provider gRPC server (--port, BGPProviderService only) Agent->>GoBGP: start embedded server - Agent->>GoBGP: WaitReady — poll gRPC API port (30s timeout) - Agent->>Kubernetes: EnsureGoBGPProvider (create/update BGPProvider CR) - Agent->>Agent: mark gRPC health SERVING - Agent->>Agent: mgr.Start — controller-runtime loop - Note over Agent: on shutdown: DeleteGoBGPProvider, GracefulStop gRPC + Agent->>Kubernetes: EnsureGoBGPProvider (create/update BGPProvider CR with --port address) + Agent->>GoBGP: WaitReady — poll in-process API (30s timeout) + Agent->>Agent: mark readyz SERVING + Note over Agent: on shutdown: mark readyz NOT_SERVING, GracefulStop both gRPC servers ``` diff --git a/go.mod b/go.mod index e20c2c2..4326f96 100644 --- a/go.mod +++ b/go.mod @@ -8,13 +8,11 @@ require ( github.com/kenshaw/baseconv v0.1.1 github.com/lorenzosaino/go-sysctl v0.3.1 github.com/osrg/gobgp/v4 v4.6.0 - github.com/prometheus/client_golang v1.23.2 github.com/spf13/cobra v1.10.2 github.com/vishvananda/netlink v1.3.2-0.20260610182031-c05a276ed0e0 - go.miloapis.com/cosmos v0.0.0-20260615212649-d08f7867c312 + go.miloapis.com/cosmos v0.0.0-20260616211804-7c3afe69d1e7 golang.org/x/sys v0.46.0 google.golang.org/grpc v1.81.1 - k8s.io/api v0.36.0 k8s.io/apimachinery v0.36.1 k8s.io/client-go v0.36.0 sigs.k8s.io/controller-runtime v0.24.1 @@ -34,7 +32,6 @@ require ( github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/gaissmai/bart v0.26.1 // indirect github.com/go-logr/logr v1.4.3 // indirect - github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect @@ -54,6 +51,7 @@ require ( github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.19.2 // indirect @@ -68,7 +66,6 @@ require ( github.com/vishvananda/netns v0.0.5 // indirect github.com/x448/float16 v0.8.4 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.1 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/exp/typeparams v0.0.0-20220613132600-b0d781184e0d // indirect @@ -89,6 +86,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect honnef.co/go/tools v0.3.2 // indirect + k8s.io/api v0.36.0 // indirect k8s.io/apiextensions-apiserver v0.36.0 // indirect k8s.io/klog/v2 v2.140.0 // indirect k8s.io/kube-openapi v0.0.0-20260317180543-43fb72c5454a // indirect diff --git a/go.sum b/go.sum index ba71666..8de5196 100644 --- a/go.sum +++ b/go.sum @@ -164,8 +164,8 @@ github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zd github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= -go.miloapis.com/cosmos v0.0.0-20260615212649-d08f7867c312 h1:bIGAtAgfMg8vUni2Te5m4qT2be4mxz9Gr3OJGJK+k7M= -go.miloapis.com/cosmos v0.0.0-20260615212649-d08f7867c312/go.mod h1:c99/Iy0c7lwmI46d4brOQyRc1JKjzkl9xRIcU3hnHo4= +go.miloapis.com/cosmos v0.0.0-20260616211804-7c3afe69d1e7 h1:4OVGp1zHzreRr9fJe5jBsMFGWDLPqdjBKMfk3q5ysEI= +go.miloapis.com/cosmos v0.0.0-20260616211804-7c3afe69d1e7/go.mod h1:xOxb+AOoMVLwPePxvRNAQub9OCbF9zibIURgGowdY5U= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= diff --git a/internal/agent/agent.go b/internal/agent/agent.go index f117c5e..531f114 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -6,14 +6,12 @@ import ( "fmt" "log/slog" "net" - "net/http" "os" "os/signal" "syscall" "time" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "k8s.io/apimachinery/pkg/runtime" @@ -21,15 +19,12 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/healthz" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + providerv1alpha1 "go.miloapis.com/cosmos/api/proto/bgp/provider/v1alpha1" providersv1alpha1 "go.miloapis.com/cosmos/api/providers/v1alpha1" "go.datum.net/galactic/internal/bootstrap" "go.datum.net/galactic/internal/gobgp" - "go.datum.net/galactic/internal/metrics" ) var scheme = runtime.NewScheme() @@ -41,14 +36,10 @@ func init() { // Options holds agent configuration. type Options struct { - MetricsAddr string - HealthAddr string - NodeName string - Plane string - GoBGPEnabled bool - GoBGPAPIPort int - GoBGPLogLevel string - GRPCHealthPort int + NodeName string + Role string + HealthPort int + Port int } // Run starts galactic-agent and blocks until ctx is cancelled or a signal arrives. @@ -60,11 +51,8 @@ func Run(ctx context.Context, opts Options) error { opts.NodeName = os.Getenv("NODE_NAME") } - metrics.MustRegister() - ctrl.SetLogger(zap.New(zap.UseDevMode(false))) - - if !opts.GoBGPEnabled { - slog.Warn("no providers enabled; agent is running but will not configure any BGP daemons") + if opts.Role != "overlay" && opts.Role != "overlay-rr" { + return fmt.Errorf("invalid --role %q: must be overlay or overlay-rr", opts.Role) } restCfg, err := ctrl.GetConfig() @@ -72,113 +60,78 @@ func Run(ctx context.Context, opts Options) error { return fmt.Errorf("get k8s config: %w", err) } - mgr, err := ctrl.NewManager(restCfg, ctrl.Options{ - Scheme: scheme, - HealthProbeBindAddress: opts.HealthAddr, - Metrics: metricsserver.Options{BindAddress: opts.MetricsAddr}, - }) + // Health gRPC server: Kubernetes probes connect here directly. + // "" (liveness) is SERVING immediately; "readyz" becomes SERVING once GoBGP is ready. + healthSrv := health.NewServer() + healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + healthSrv.SetServingStatus("readyz", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + + healthGRPCSrv := grpc.NewServer() + grpc_health_v1.RegisterHealthServer(healthGRPCSrv, healthSrv) + + healthLis, err := net.Listen("tcp", fmt.Sprintf(":%d", opts.HealthPort)) if err != nil { - return fmt.Errorf("new manager: %w", err) + return fmt.Errorf("listen health port %d: %w", opts.HealthPort, err) } - // gRPC health server — always started; reports NOT_SERVING until GoBGP is ready. - healthSrv := health.NewServer() - healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + go func() { + if err := healthGRPCSrv.Serve(healthLis); err != nil { + slog.Error("health grpc server stopped", "err", err) + } + }() + defer healthGRPCSrv.GracefulStop() - grpcSrv := grpc.NewServer() - grpc_health_v1.RegisterHealthServer(grpcSrv, healthSrv) + // Provider gRPC server: BGPProviderService only (cosmos connects here). + providerSrv := grpc.NewServer() - grpcLis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", opts.GRPCHealthPort)) + providerAddr := fmt.Sprintf("localhost:%d", opts.Port) + providerLis, err := net.Listen("tcp", providerAddr) if err != nil { - return fmt.Errorf("listen grpc health port %d: %w", opts.GRPCHealthPort, err) + return fmt.Errorf("listen provider port %d: %w", opts.Port, err) } go func() { - if err := grpcSrv.Serve(grpcLis); err != nil { - slog.Error("grpc health server stopped", "err", err) + if err := providerSrv.Serve(providerLis); err != nil { + slog.Error("provider grpc server stopped", "err", err) } }() - defer grpcSrv.GracefulStop() - - if opts.GoBGPEnabled { - gobgpSrv := gobgp.New(gobgp.Config{ - APIPort: opts.GoBGPAPIPort, - LogLevel: opts.GoBGPLogLevel, - }) - - go func() { - if err := gobgpSrv.Start(ctx); err != nil { - slog.Error("gobgp server stopped", "err", err) - } - }() - - // Wait for GoBGP gRPC API to accept connections, then mark health SERVING. - go func() { - waitCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - if err := gobgpSrv.WaitReady(waitCtx); err != nil { - slog.Error("gobgp did not become ready", "err", err) - return - } - healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) - slog.Info("gobgp ready", "addr", gobgpSrv.Addr()) - }() - - // Bootstrap BGPProvider before starting the manager. - directClient, err := client.New(restCfg, client.Options{Scheme: scheme}) - if err != nil { - return fmt.Errorf("create bootstrap client: %w", err) - } - if opts.NodeName != "" { - if err := bootstrap.EnsureGoBGPProvider(ctx, directClient, opts.NodeName, opts.Plane, gobgpSrv.Addr()); err != nil { - return fmt.Errorf("bootstrap gobgp provider: %w", err) - } + defer providerSrv.GracefulStop() + + gobgpSrv := gobgp.New(gobgp.Config{}) + + // Register BGPProviderService so cosmos can configure GoBGP via gRPC. + providerv1alpha1.RegisterBGPProviderServiceServer(providerSrv, gobgp.NewProviderServer(gobgpSrv)) + + go func() { + if err := gobgpSrv.Start(ctx); err != nil { + slog.Error("gobgp server stopped", "err", err) } + }() - // Delete BGPProvider on clean shutdown using a fresh context. - defer func() { - deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if opts.NodeName != "" { - if err := bootstrap.DeleteGoBGPProvider(deleteCtx, directClient, opts.NodeName, opts.Plane); err != nil { - slog.Error("failed to delete BGPProvider on shutdown", "err", err) - } - } - healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) - }() - - // Readyz check: probe the gRPC health service we expose. - if err := mgr.AddReadyzCheck("gobgp", func(_ *http.Request) error { - conn, err := grpc.NewClient( - fmt.Sprintf("localhost:%d", opts.GRPCHealthPort), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - return err - } - defer func() { _ = conn.Close() }() - hc := grpc_health_v1.NewHealthClient(conn) - probeCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - resp, err := hc.Check(probeCtx, &grpc_health_v1.HealthCheckRequest{}) - if err != nil { - return err - } - if resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { - return fmt.Errorf("gobgp not serving: %s", resp.Status) - } - return nil - }); err != nil { - return fmt.Errorf("add gobgp readyz check: %w", err) + // Wait for GoBGP to initialize, then mark readyz SERVING. + go func() { + waitCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err := gobgpSrv.WaitReady(waitCtx); err != nil { + slog.Error("gobgp did not become ready", "err", err) + return } - } + healthSrv.SetServingStatus("readyz", grpc_health_v1.HealthCheckResponse_SERVING) + slog.Info("gobgp ready", "provider-addr", providerAddr) + }() - if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - return fmt.Errorf("add healthz check: %w", err) + directClient, err := client.New(restCfg, client.Options{Scheme: scheme}) + if err != nil { + return fmt.Errorf("create bootstrap client: %w", err) } - if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - return fmt.Errorf("add readyz check: %w", err) + if opts.NodeName != "" { + if err := bootstrap.EnsureGoBGPProvider(ctx, directClient, opts.NodeName, opts.Role, providerAddr); err != nil { + return fmt.Errorf("bootstrap gobgp provider: %w", err) + } } - return mgr.Start(ctx) + defer healthSrv.SetServingStatus("readyz", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + + <-ctx.Done() + return nil } diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index cbee624..4eefeae 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -15,32 +15,32 @@ import ( const ( labelNode = "bgp.miloapis.com/node" labelManagedBy = "galactic.io/managed-by" - labelPlane = "galactic.io/plane" + labelRole = "galactic.io/role" labelDaemon = "galactic.io/daemon" managedByValue = "galactic-agent" - defaultPlane = "overlay" + defaultRole = "overlay" ) -// providerName returns the BGPProvider resource name for this node and plane. -// The default "overlay" plane uses the short form for compatibility with existing -// deployments; additional planes append a suffix. -func providerName(nodeName, plane string) string { - if plane == "" || plane == defaultPlane { +// providerName returns the BGPProvider resource name for this node and role. +// The default "overlay" role uses the short form for compatibility with existing +// deployments; additional roles append a suffix. +func providerName(nodeName, role string) string { + if role == "" || role == defaultRole { return fmt.Sprintf("galactic-gobgp-%s", nodeName) } - return fmt.Sprintf("galactic-gobgp-%s-%s", nodeName, plane) + return fmt.Sprintf("galactic-gobgp-%s-%s", nodeName, role) } // EnsureGoBGPProvider creates or updates the BGPProvider resource for this node. // Idempotent — safe to call on every startup. -func EnsureGoBGPProvider(ctx context.Context, c client.Client, nodeName, plane, endpoint string) error { - if plane == "" { - plane = defaultPlane +func EnsureGoBGPProvider(ctx context.Context, c client.Client, nodeName, role, endpoint string) error { + if role == "" { + role = defaultRole } obj := &providersv1alpha1.BGPProvider{} - obj.Name = providerName(nodeName, plane) + obj.Name = providerName(nodeName, role) _, err := controllerutil.CreateOrUpdate(ctx, c, obj, func() error { if obj.Labels == nil { @@ -48,13 +48,11 @@ func EnsureGoBGPProvider(ctx context.Context, c client.Client, nodeName, plane, } obj.Labels[labelNode] = nodeName obj.Labels[labelManagedBy] = managedByValue - obj.Labels[labelPlane] = plane + obj.Labels[labelRole] = role obj.Labels[labelDaemon] = "gobgp" obj.Spec = providersv1alpha1.BGPProviderSpec{ - Type: "GoBGP", - GoBGP: &providersv1alpha1.GoBGPProviderConfig{ - Endpoint: endpoint, - }, + Type: "GoBGP", + Endpoint: endpoint, } return nil }) @@ -64,12 +62,12 @@ func EnsureGoBGPProvider(ctx context.Context, c client.Client, nodeName, plane, return nil } -// DeleteGoBGPProvider removes the BGPProvider resource for this node and plane. +// DeleteGoBGPProvider removes the BGPProvider resource for this node and role. // Idempotent — safe to call even if the resource does not exist. -func DeleteGoBGPProvider(ctx context.Context, c client.Client, nodeName, plane string) error { +func DeleteGoBGPProvider(ctx context.Context, c client.Client, nodeName, role string) error { obj := &providersv1alpha1.BGPProvider{ ObjectMeta: metav1.ObjectMeta{ - Name: providerName(nodeName, plane), + Name: providerName(nodeName, role), }, } if err := c.Delete(ctx, obj); client.IgnoreNotFound(err) != nil { diff --git a/internal/bootstrap/bootstrap_test.go b/internal/bootstrap/bootstrap_test.go index 4298356..ca5573f 100644 --- a/internal/bootstrap/bootstrap_test.go +++ b/internal/bootstrap/bootstrap_test.go @@ -6,19 +6,19 @@ import ( func TestProviderName(t *testing.T) { cases := []struct { - node string - plane string - want string + node string + role string + want string }{ - {"worker-1", defaultPlane, "galactic-gobgp-worker-1"}, + {"worker-1", defaultRole, "galactic-gobgp-worker-1"}, {"worker-1", "", "galactic-gobgp-worker-1"}, - {"node-abc", defaultPlane, "galactic-gobgp-node-abc"}, + {"node-abc", defaultRole, "galactic-gobgp-node-abc"}, {"iad-rr-worker", "overlay-rr", "galactic-gobgp-iad-rr-worker-overlay-rr"}, } for _, tc := range cases { - got := providerName(tc.node, tc.plane) + got := providerName(tc.node, tc.role) if got != tc.want { - t.Errorf("providerName(%q, %q) = %q, want %q", tc.node, tc.plane, got, tc.want) + t.Errorf("providerName(%q, %q) = %q, want %q", tc.node, tc.role, got, tc.want) } } } diff --git a/internal/cni/cni_test.go b/internal/cni/cni_test.go index 1a84592..52b58f1 100644 --- a/internal/cni/cni_test.go +++ b/internal/cni/cni_test.go @@ -33,15 +33,15 @@ func providerForNode(name, node string, extraLabels map[string]string) *provider lbls := map[string]string{ labelNode: node, labelDaemon: daemonGoBGP, - "galactic.io/plane": "overlay", + "galactic.io/role": "overlay", "galactic.io/managed-by": "galactic-agent", } maps.Copy(lbls, extraLabels) return &providersv1alpha1.BGPProvider{ ObjectMeta: metav1.ObjectMeta{Name: name, Labels: lbls}, Spec: providersv1alpha1.BGPProviderSpec{ - Type: "GoBGP", - GoBGP: &providersv1alpha1.GoBGPProviderConfig{Endpoint: "localhost:50051"}, + Type: "GoBGP", + Endpoint: "localhost:50051", }, } } diff --git a/internal/gobgp/provider.go b/internal/gobgp/provider.go new file mode 100644 index 0000000..9009ae9 --- /dev/null +++ b/internal/gobgp/provider.go @@ -0,0 +1,509 @@ +package gobgp + +import ( + "context" + "fmt" + "net/netip" + "strings" + "time" + + api "github.com/osrg/gobgp/v4/api" + "github.com/osrg/gobgp/v4/pkg/apiutil" + "github.com/osrg/gobgp/v4/pkg/packet/bgp" + gobgpserver "github.com/osrg/gobgp/v4/pkg/server" + providerv1alpha1 "go.miloapis.com/cosmos/api/proto/bgp/provider/v1alpha1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + safiUnicast = "unicast" + globalPolicyTable = "global" +) + +// ProviderServer implements providerv1alpha1.BGPProviderServiceServer, translating +// cosmos BGP provider calls into direct GoBGP BgpServer API calls. +type ProviderServer struct { + providerv1alpha1.UnimplementedBGPProviderServiceServer + srv *Server +} + +// NewProviderServer creates a ProviderServer backed by the given gobgp.Server. +func NewProviderServer(srv *Server) *ProviderServer { + return &ProviderServer{srv: srv} +} + +// bgp returns the running BgpServer or an Unavailable error if not yet started. +func (p *ProviderServer) bgp() (*gobgpserver.BgpServer, error) { + b := p.srv.bgp.Load() + if b == nil { + return nil, status.Error(codes.Unavailable, "gobgp not started") + } + return b, nil +} + +// Ready returns OK once the GoBGP server is initialized and ready to accept calls. +func (p *ProviderServer) Ready(_ context.Context, _ *providerv1alpha1.ReadyRequest) (*providerv1alpha1.ReadyResponse, error) { + if _, err := p.bgp(); err != nil { + return nil, err + } + return &providerv1alpha1.ReadyResponse{}, nil +} + +// Capabilities reports the address families and features this provider supports. +func (p *ProviderServer) Capabilities(_ context.Context, _ *providerv1alpha1.CapabilitiesRequest) (*providerv1alpha1.CapabilitiesResponse, error) { + return &providerv1alpha1.CapabilitiesResponse{ + Capabilities: &providerv1alpha1.CapabilitySet{ + AddressFamilies: []*providerv1alpha1.AddressFamily{ + {Afi: "IPv4", Safi: "Unicast"}, + {Afi: "IPv6", Safi: "Unicast"}, + }, + RouteReflection: false, + Bfd: false, + }, + }, nil +} + +// ConfigureSpeaker applies global BGP speaker configuration by calling StartBgp. +// If BGP is already running, a fresh BgpServer is created because StopBgp in +// GoBGP v4 permanently terminates the Serve loop. +func (p *ProviderServer) ConfigureSpeaker(ctx context.Context, req *providerv1alpha1.ConfigureSpeakerRequest) (*providerv1alpha1.ConfigureSpeakerResponse, error) { + b, err := p.bgp() + if err != nil { + return nil, err + } + spec := req.GetSpec() + if spec == nil { + return nil, status.Error(codes.InvalidArgument, "spec is required") + } + + restarted := false + resp, getErr := b.GetBgp(ctx, &api.GetBgpRequest{}) + if getErr == nil && resp.Global != nil && resp.Global.Asn != 0 { + b, err = p.srv.Reconfigure() + if err != nil { + return nil, status.Errorf(codes.Internal, "reconfigure bgp server: %v", err) + } + restarted = true + } + + global := &api.Global{ + Asn: uint32(spec.GetAsNumber()), + RouterId: spec.GetRouterId(), + ListenPort: spec.GetListenPort(), + } + + if rr := spec.GetRouteReflector(); rr != nil && rr.GetClusterId() != "" { + global.RouteSelectionOptions = &api.RouteSelectionOptionsConfig{ + AlwaysCompareMed: spec.GetBestPath().GetAlwaysCompareMed(), + IgnoreAsPathLength: false, + ExternalCompareRouterId: spec.GetBestPath().GetCompareRouterId(), + } + } + + if err := b.StartBgp(ctx, &api.StartBgpRequest{Global: global}); err != nil { + return nil, status.Errorf(codes.Internal, "start bgp: %v", err) + } + + return &providerv1alpha1.ConfigureSpeakerResponse{Restarted: restarted}, nil +} + +// AddOrUpdatePeer adds or updates a BGP peer. +func (p *ProviderServer) AddOrUpdatePeer(ctx context.Context, req *providerv1alpha1.AddOrUpdatePeerRequest) (*providerv1alpha1.AddOrUpdatePeerResponse, error) { + b, err := p.bgp() + if err != nil { + return nil, err + } + spec := req.GetPeer() + if spec == nil { + return nil, status.Error(codes.InvalidArgument, "peer is required") + } + + peer := peerFromSpec(spec) + addErr := b.AddPeer(ctx, &api.AddPeerRequest{Peer: peer}) + if addErr != nil { + switch { + case strings.Contains(addErr.Error(), "can't overwrite the existing peer"): + if _, updateErr := b.UpdatePeer(ctx, &api.UpdatePeerRequest{Peer: peer}); updateErr != nil { + return nil, status.Errorf(codes.Internal, "update peer %s: %v", spec.GetAddress(), updateErr) + } + case strings.Contains(addErr.Error(), "bgp server hasn't started yet"): + return nil, status.Errorf(codes.Unavailable, "bgp speaker not yet configured") + default: + return nil, status.Errorf(codes.Internal, "add peer %s: %v", spec.GetAddress(), addErr) + } + } + + return &providerv1alpha1.AddOrUpdatePeerResponse{}, nil +} + +// DeletePeer removes a BGP peer. +func (p *ProviderServer) DeletePeer(ctx context.Context, req *providerv1alpha1.DeletePeerRequest) (*providerv1alpha1.DeletePeerResponse, error) { + b, err := p.bgp() + if err != nil { + return nil, err + } + if err := b.DeletePeer(ctx, &api.DeletePeerRequest{Address: req.GetAddress()}); err != nil { + if strings.Contains(err.Error(), "not found") { + return &providerv1alpha1.DeletePeerResponse{}, nil + } + return nil, status.Errorf(codes.Internal, "delete peer %s: %v", req.GetAddress(), err) + } + return &providerv1alpha1.DeletePeerResponse{}, nil +} + +// AddOrUpdateAdvertisement injects prefixes into the global RIB for advertisement. +func (p *ProviderServer) AddOrUpdateAdvertisement(_ context.Context, req *providerv1alpha1.AddOrUpdateAdvertisementRequest) (*providerv1alpha1.AddOrUpdateAdvertisementResponse, error) { + b, err := p.bgp() + if err != nil { + return nil, err + } + spec := req.GetAdvertisement() + if spec == nil { + return nil, status.Error(codes.InvalidArgument, "advertisement is required") + } + + for _, prefixStr := range spec.GetPrefixes() { + prefix, err := netip.ParsePrefix(prefixStr) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid prefix %q: %v", prefixStr, err) + } + path, err := buildPath(prefix, false) + if err != nil { + return nil, status.Errorf(codes.Internal, "build path for %q: %v", prefixStr, err) + } + if _, err := b.AddPath(apiutil.AddPathRequest{Paths: []*apiutil.Path{path}}); err != nil { + return nil, status.Errorf(codes.Internal, "add path %q: %v", prefixStr, err) + } + } + + return &providerv1alpha1.AddOrUpdateAdvertisementResponse{}, nil +} + +// DeleteAdvertisement withdraws a prefix from the global RIB. +func (p *ProviderServer) DeleteAdvertisement(_ context.Context, req *providerv1alpha1.DeleteAdvertisementRequest) (*providerv1alpha1.DeleteAdvertisementResponse, error) { + b, err := p.bgp() + if err != nil { + return nil, err + } + prefix, err := netip.ParsePrefix(req.GetPrefix()) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid prefix %q: %v", req.GetPrefix(), err) + } + path, err := buildPath(prefix, true) + if err != nil { + return nil, status.Errorf(codes.Internal, "build path for %q: %v", req.GetPrefix(), err) + } + if err := b.DeletePath(apiutil.DeletePathRequest{Paths: []*apiutil.Path{path}}); err != nil { + return nil, status.Errorf(codes.Internal, "delete path %q: %v", req.GetPrefix(), err) + } + return &providerv1alpha1.DeleteAdvertisementResponse{}, nil +} + +// AddOrUpdatePolicy creates or replaces a named routing policy in GoBGP. +func (p *ProviderServer) AddOrUpdatePolicy(ctx context.Context, req *providerv1alpha1.AddOrUpdatePolicyRequest) (*providerv1alpha1.AddOrUpdatePolicyResponse, error) { + b, err := p.bgp() + if err != nil { + return nil, err + } + spec := req.GetPolicy() + if spec == nil { + return nil, status.Error(codes.InvalidArgument, "policy is required") + } + + if err := upsertPolicy(ctx, b, spec); err != nil { + return nil, status.Errorf(codes.Internal, "upsert policy %q: %v", spec.GetName(), err) + } + return &providerv1alpha1.AddOrUpdatePolicyResponse{}, nil +} + +// DeletePolicy removes a named routing policy from GoBGP. +func (p *ProviderServer) DeletePolicy(ctx context.Context, req *providerv1alpha1.DeletePolicyRequest) (*providerv1alpha1.DeletePolicyResponse, error) { + b, err := p.bgp() + if err != nil { + return nil, err + } + name := req.GetPolicyName() + + // Remove policy assignments (both directions), then the policy itself. + for _, dir := range []api.PolicyDirection{api.PolicyDirection_POLICY_DIRECTION_IMPORT, api.PolicyDirection_POLICY_DIRECTION_EXPORT} { + _ = b.DeletePolicyAssignment(ctx, &api.DeletePolicyAssignmentRequest{ + Assignment: &api.PolicyAssignment{ + Name: globalPolicyTable, + Direction: dir, + Policies: []*api.Policy{{Name: name}}, + }, + }) + } + _ = b.DeletePolicy(ctx, &api.DeletePolicyRequest{ + Policy: &api.Policy{Name: name}, + PreserveStatements: false, + All: true, + }) + return &providerv1alpha1.DeletePolicyResponse{}, nil +} + +// peerFromSpec converts a cosmos PeerSpec to a GoBGP api.Peer. +func peerFromSpec(spec *providerv1alpha1.PeerSpec) *api.Peer { + peer := &api.Peer{ + Conf: &api.PeerConf{ + NeighborAddress: spec.GetAddress(), + PeerAsn: uint32(spec.GetAsNumber()), + AllowOwnAsn: uint32(spec.GetAllowAsIn()), + }, + } + + for _, af := range spec.GetFamilies() { + peer.AfiSafis = append(peer.AfiSafis, &api.AfiSafi{ + Config: &api.AfiSafiConfig{Family: familyFromSpec(af)}, + }) + } + + if t := spec.GetTimers(); t != nil && (t.GetHoldTime() > 0 || t.GetKeepalive() > 0) { + peer.Timers = &api.Timers{ + Config: &api.TimersConfig{ + HoldTime: uint64(t.GetHoldTime()), + KeepaliveInterval: uint64(t.GetKeepalive()), + }, + } + } + + if spec.GetRouteReflectorClient() { + peer.RouteReflector = &api.RouteReflector{RouteReflectorClient: true} + } + + if spec.GetPassive() { + peer.Transport = &api.Transport{PassiveMode: true} + } + if spec.GetRemotePort() > 0 { + if peer.Transport == nil { + peer.Transport = &api.Transport{} + } + peer.Transport.RemotePort = uint32(spec.GetRemotePort()) + } + + if spec.EbgpMultihop != nil && *spec.EbgpMultihop > 0 { + peer.EbgpMultihop = &api.EbgpMultihop{ + Enabled: true, + MultihopTtl: uint32(*spec.EbgpMultihop), + } + } + + if spec.TtlSecurity != nil && *spec.TtlSecurity > 0 { + peer.TtlSecurity = &api.TtlSecurity{ + Enabled: true, + TtlMin: uint32(*spec.TtlSecurity), + } + } + + if spec.GetPassword() != "" { + peer.Conf.AuthPassword = spec.GetPassword() + } + + return peer +} + +// familyFromSpec maps a cosmos AddressFamily to a GoBGP api.Family. +func familyFromSpec(af *providerv1alpha1.AddressFamily) *api.Family { + f := &api.Family{} + switch strings.ToLower(af.GetAfi()) { + case "ipv4", "ip": + f.Afi = api.Family_AFI_IP + case "ipv6", "ip6": + f.Afi = api.Family_AFI_IP6 + case "l2vpn": + f.Afi = api.Family_AFI_L2VPN + } + switch strings.ToLower(af.GetSafi()) { + case safiUnicast: + f.Safi = api.Family_SAFI_UNICAST + case "multicast": + f.Safi = api.Family_SAFI_MULTICAST + case "mpls-vpn", "vpn": + f.Safi = api.Family_SAFI_MPLS_VPN + case "evpn": + f.Safi = api.Family_SAFI_EVPN + } + return f +} + +// buildPath constructs an apiutil.Path for a CIDR prefix. +func buildPath(prefix netip.Prefix, withdrawal bool) (*apiutil.Path, error) { + prefix = prefix.Masked() + nlri, err := bgp.NewIPAddrPrefix(prefix) + if err != nil { + return nil, fmt.Errorf("create NLRI: %w", err) + } + + family := bgp.RF_IPv4_UC + if prefix.Addr().Is6() { + family = bgp.RF_IPv6_UC + } + + var attrs []bgp.PathAttributeInterface + if !withdrawal { + nh, err := bgp.NewPathAttributeNextHop(netip.MustParseAddr("0.0.0.0")) + if err != nil { + return nil, fmt.Errorf("create nexthop attr: %w", err) + } + attrs = []bgp.PathAttributeInterface{ + bgp.NewPathAttributeOrigin(bgp.BGP_ORIGIN_ATTR_TYPE_IGP), + nh, + } + } + + return &apiutil.Path{ + Family: family, + Nlri: nlri, + Attrs: attrs, + Withdrawal: withdrawal, + Age: time.Now().Unix(), + }, nil +} + +// upsertPolicy creates or replaces a policy and its defined sets in GoBGP. +func upsertPolicy(ctx context.Context, b *gobgpserver.BgpServer, spec *providerv1alpha1.PolicySpec) error { + allStmts := append(spec.GetImportStatements(), spec.GetExportStatements()...) + + // Collect and create unique prefix defined sets. + prefixSetsSeen := map[string]bool{} + for _, stmt := range allStmts { + cond := stmt.GetConditions() + if cond == nil { + continue + } + for _, setName := range cond.GetPrefixSets() { + if prefixSetsSeen[setName] { + continue + } + prefixSetsSeen[setName] = true + if err := b.AddDefinedSet(ctx, &api.AddDefinedSetRequest{ + DefinedSet: &api.DefinedSet{ + DefinedType: api.DefinedType_DEFINED_TYPE_PREFIX, + Name: setName, + }, + Replace: true, + }); err != nil { + return fmt.Errorf("add prefix set %q: %w", setName, err) + } + } + if cs := cond.GetCommunitySet(); cs != "" && !prefixSetsSeen["community:"+cs] { + prefixSetsSeen["community:"+cs] = true + if err := b.AddDefinedSet(ctx, &api.AddDefinedSetRequest{ + DefinedSet: &api.DefinedSet{ + DefinedType: api.DefinedType_DEFINED_TYPE_COMMUNITY, + Name: cs, + }, + Replace: true, + }); err != nil { + return fmt.Errorf("add community set %q: %w", cs, err) + } + } + } + + // Build GoBGP statements. + importStmts := buildStatements(spec.GetImportStatements()) + exportStmts := buildStatements(spec.GetExportStatements()) + + // Add/replace the policy with all statements. + allGoBGPStmts := append(importStmts, exportStmts...) + if err := b.AddPolicy(ctx, &api.AddPolicyRequest{ + Policy: &api.Policy{ + Name: spec.GetName(), + Statements: allGoBGPStmts, + }, + ReferExistingStatements: false, + }); err != nil { + return fmt.Errorf("add policy: %w", err) + } + + // Assign policy to global RIB for import and export. + if len(importStmts) > 0 { + if err := b.AddPolicyAssignment(ctx, &api.AddPolicyAssignmentRequest{ + Assignment: &api.PolicyAssignment{ + Name: globalPolicyTable, + Direction: api.PolicyDirection_POLICY_DIRECTION_IMPORT, + Policies: []*api.Policy{{Name: spec.GetName()}}, + DefaultAction: api.RouteAction_ROUTE_ACTION_ACCEPT, + }, + }); err != nil { + return fmt.Errorf("assign import policy: %w", err) + } + } + if len(exportStmts) > 0 { + if err := b.AddPolicyAssignment(ctx, &api.AddPolicyAssignmentRequest{ + Assignment: &api.PolicyAssignment{ + Name: globalPolicyTable, + Direction: api.PolicyDirection_POLICY_DIRECTION_EXPORT, + Policies: []*api.Policy{{Name: spec.GetName()}}, + DefaultAction: api.RouteAction_ROUTE_ACTION_ACCEPT, + }, + }); err != nil { + return fmt.Errorf("assign export policy: %w", err) + } + } + + return nil +} + +// buildStatements converts cosmos PolicyStatements to GoBGP api.Statements. +func buildStatements(stmts []*providerv1alpha1.PolicyStatement) []*api.Statement { + out := make([]*api.Statement, 0, len(stmts)) + for _, s := range stmts { + gs := &api.Statement{Name: s.GetName()} + + if cond := s.GetConditions(); cond != nil { + gs.Conditions = &api.Conditions{} + if sets := cond.GetPrefixSets(); len(sets) > 0 { + gs.Conditions.PrefixSet = &api.MatchSet{ + Type: api.MatchSet_TYPE_ANY, + Name: sets[0], + } + } + if cs := cond.GetCommunitySet(); cs != "" { + gs.Conditions.CommunitySet = &api.MatchSet{ + Type: api.MatchSet_TYPE_ANY, + Name: cs, + } + } + } + + if act := s.GetActions(); act != nil { + gs.Actions = &api.Actions{} + switch strings.ToLower(act.GetRouteDisposition()) { + case "accept": + gs.Actions.RouteAction = api.RouteAction_ROUTE_ACTION_ACCEPT + case "reject": + gs.Actions.RouteAction = api.RouteAction_ROUTE_ACTION_REJECT + } + if sc := act.GetSetCommunity(); sc != nil && len(sc.GetCommunities()) > 0 { + communityType := api.CommunityAction_TYPE_ADD + switch strings.ToLower(sc.GetMethod()) { + case "replace": + communityType = api.CommunityAction_TYPE_REPLACE + case "remove": + communityType = api.CommunityAction_TYPE_REMOVE + } + gs.Actions.Community = &api.CommunityAction{ + Type: communityType, + Communities: sc.GetCommunities(), + } + } + if act.SetLocalPreference != nil { + gs.Actions.LocalPref = &api.LocalPrefAction{Value: uint32(*act.SetLocalPreference)} + } + if act.SetMed != nil { + gs.Actions.Med = &api.MedAction{ + Type: api.MedAction_TYPE_REPLACE, + Value: int64(*act.SetMed), + } + } + if nh := act.GetSetNextHop(); nh != "" { + gs.Actions.Nexthop = &api.NexthopAction{Address: nh} + } + } + + out = append(out, gs) + } + return out +} diff --git a/internal/gobgp/server.go b/internal/gobgp/server.go index f11108b..443a596 100644 --- a/internal/gobgp/server.go +++ b/internal/gobgp/server.go @@ -6,8 +6,7 @@ import ( "fmt" "io" "log/slog" - "net" - "time" + "sync/atomic" gobgpserver "github.com/osrg/gobgp/v4/pkg/server" ) @@ -16,8 +15,6 @@ const defaultLogLevel = "panic" // Config holds configuration for the embedded GoBGP server. type Config struct { - // APIPort is the port the GoBGP gRPC API listens on. Cosmos dials this port. - APIPort int // LogLevel controls GoBGP's internal log verbosity. // Valid values: debug, info, warn, error, panic. Defaults to panic. LogLevel string @@ -25,61 +22,67 @@ type Config struct { // Server wraps an embedded GoBGP BgpServer. type Server struct { - cfg Config - bgp *gobgpserver.BgpServer + cfg Config + bgp atomic.Pointer[gobgpserver.BgpServer] + ready chan struct{} } // New creates a Server with the given config. Call Start to run it. func New(cfg Config) *Server { - if cfg.APIPort == 0 { - cfg.APIPort = 50051 - } if cfg.LogLevel == "" { cfg.LogLevel = defaultLogLevel } - return &Server{cfg: cfg} -} - -// Addr returns the gRPC API address the server listens on. -func (s *Server) Addr() string { - return fmt.Sprintf("localhost:%d", s.cfg.APIPort) + return &Server{ + cfg: cfg, + ready: make(chan struct{}), + } } // Start runs the embedded GoBGP server until ctx is cancelled. // It blocks until the server has stopped. func (s *Server) Start(ctx context.Context) error { - level := parseLogLevel(s.cfg.LogLevel) - levelVar := &slog.LevelVar{} - levelVar.Set(level) - logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: levelVar})) + b := s.newBgpServer() + s.bgp.Store(b) + close(s.ready) - s.bgp = gobgpserver.NewBgpServer( - gobgpserver.GrpcListenAddress(s.Addr()), - gobgpserver.LoggerOption(logger, levelVar), - ) - - go s.bgp.Serve() + go b.Serve() <-ctx.Done() - s.bgp.Stop() + if current := s.bgp.Load(); current != nil { + current.Stop() + } return nil } -// WaitReady blocks until the GoBGP gRPC API port accepts TCP connections or -// ctx is cancelled. Returns an error if ctx is cancelled before the port is ready. +// Reconfigure replaces the running BgpServer with a fresh instance. +// StopBgp in GoBGP v4 terminates the Serve loop, making the server permanently +// dead. Call this instead of StopBgp+StartBgp when reconfiguration is needed. +// The caller must call StartBgp on the returned server. +func (s *Server) Reconfigure() (*gobgpserver.BgpServer, error) { + if old := s.bgp.Load(); old != nil { + old.Stop() + } + b := s.newBgpServer() + s.bgp.Store(b) + go b.Serve() + return b, nil +} + +func (s *Server) newBgpServer() *gobgpserver.BgpServer { + level := parseLogLevel(s.cfg.LogLevel) + levelVar := &slog.LevelVar{} + levelVar.Set(level) + logger := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: levelVar})) + return gobgpserver.NewBgpServer(gobgpserver.LoggerOption(logger, levelVar)) +} + +// WaitReady blocks until the GoBGP server is initialized or ctx is cancelled. func (s *Server) WaitReady(ctx context.Context) error { - addr := s.Addr() - for { - conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) - if err == nil { - _ = conn.Close() - return nil - } - select { - case <-ctx.Done(): - return fmt.Errorf("gobgp not ready at %s: %w", addr, ctx.Err()) - case <-time.After(50 * time.Millisecond): - } + select { + case <-s.ready: + return nil + case <-ctx.Done(): + return fmt.Errorf("gobgp not ready: %w", ctx.Err()) } } diff --git a/internal/gobgp/server_test.go b/internal/gobgp/server_test.go index 8c52f28..908eb53 100644 --- a/internal/gobgp/server_test.go +++ b/internal/gobgp/server_test.go @@ -1,39 +1,48 @@ package gobgp import ( + "context" "testing" + "time" ) func TestNew_Defaults(t *testing.T) { s := New(Config{}) - if s.cfg.APIPort != 50051 { - t.Errorf("default APIPort = %d, want 50051", s.cfg.APIPort) - } if s.cfg.LogLevel != defaultLogLevel { t.Errorf("default LogLevel = %q, want %q", s.cfg.LogLevel, defaultLogLevel) } } -func TestAddr(t *testing.T) { - s := New(Config{APIPort: 12345}) - if got := s.Addr(); got != "localhost:12345" { - t.Errorf("Addr() = %q, want localhost:12345", got) +func TestWaitReady(t *testing.T) { + s := New(Config{}) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go func() { + startCtx, startCancel := context.WithCancel(context.Background()) + defer startCancel() + _ = s.Start(startCtx) + }() + + if err := s.WaitReady(ctx); err != nil { + t.Errorf("WaitReady returned error: %v", err) } } -func TestParseLogLevel(t *testing.T) { - cases := []struct { - input string - }{ - {"debug"}, - {"info"}, - {"warn"}, - {"error"}, - {defaultLogLevel}, - {""}, - {"unknown"}, +func TestWaitReady_Cancelled(t *testing.T) { + s := New(Config{}) + ctx, cancel := context.WithCancel(context.Background()) + cancel() // already cancelled + + err := s.WaitReady(ctx) + if err == nil { + t.Error("expected error for cancelled context, got nil") } +} + +func TestParseLogLevel(t *testing.T) { + cases := []string{"debug", "info", "warn", "error", defaultLogLevel, "", "unknown"} for _, tc := range cases { - _ = parseLogLevel(tc.input) + _ = parseLogLevel(tc) } } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go deleted file mode 100644 index 50faa43..0000000 --- a/internal/metrics/metrics.go +++ /dev/null @@ -1,50 +0,0 @@ -// Package metrics defines Prometheus metrics for galactic-agent. -package metrics - -import "github.com/prometheus/client_golang/prometheus" - -const namespace = "galactic_agent" - -var ( - ReconcileTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "reconcile_total", - Help: "Total number of reconcile iterations per controller.", - }, []string{"controller"}) - - ReconcileErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "reconcile_errors_total", - Help: "Total number of reconcile errors per controller.", - }, []string{"controller"}) - - ProviderReady = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: namespace, - Name: "provider_ready", - Help: "1 when a backend provider is ready, 0 otherwise.", - }, []string{"provider", "daemon"}) - - BackendApplyDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Name: "backend_apply_duration_seconds", - Help: "Time spent applying desired state to a backend.", - Buckets: prometheus.DefBuckets, - }, []string{"backend"}) - - ConfigReloadTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "config_reload_total", - Help: "Total number of FRR config reloads triggered.", - }, []string{"backend"}) -) - -// MustRegister registers all metrics with the default registry. -func MustRegister() { - prometheus.MustRegister( - ReconcileTotal, - ReconcileErrors, - ProviderReady, - BackendApplyDuration, - ConfigReloadTotal, - ) -}