diff --git a/config/rbac_downstream/role.yaml b/config/rbac_downstream/role.yaml index c1880d33..d84759c8 100644 --- a/config/rbac_downstream/role.yaml +++ b/config/rbac_downstream/role.yaml @@ -182,3 +182,13 @@ rules: - envoypatchpolicies/status verbs: - get +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - create + - delete + - get + - patch + - update diff --git a/internal/controller/iroh_dns_controller.go b/internal/controller/iroh_dns_controller.go index feb11eeb..f2744fa9 100644 --- a/internal/controller/iroh_dns_controller.go +++ b/internal/controller/iroh_dns_controller.go @@ -10,6 +10,7 @@ import ( "slices" "strconv" "strings" + "time" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" @@ -58,6 +59,74 @@ const ( connectorReasonIrohPending = "Pending" ) +// irohDNSClaimLeaseHeartbeats is the number of upstream Lease heartbeat +// intervals that must pass without renewal before a competitor may take over. +const irohDNSClaimLeaseHeartbeats = 3 + +type claimStatus int + +const ( + claimStatusCreate claimStatus = iota // no existing record + claimStatusOwner // we already hold the claim + claimStatusTakeOver // downstream claim, owner dead + claimStatusDefer // downstream claim, owner alive or unknown +) + +// upstreamLeaseStatus is the result of checking the upstream Lease that +// connector_controller maintains for the foreign connector in its own cluster. +type upstreamLeaseStatus int + +const ( + upstreamLeaseUnknown upstreamLeaseStatus = iota // cluster not accessible + upstreamLeaseAlive // lease present and not expired + upstreamLeaseDead // lease absent or expired +) + +// isLeaseExpired reports whether lease has passed its renewal deadline. +// Returns true (expired) conservatively when fields are absent. +func isLeaseExpired(lease coordinationv1.Lease, now time.Time) bool { + if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil { + return true + } + deadline := lease.Spec.RenewTime.Add( + time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second, + ) + return !now.Before(deadline) +} + +// evaluateClaim decides what applyClaim should do given the current state of +// the downstream DNSRecordSet, the foreign connector's upstream Lease, and the +// downstream claim Lease. +// +// Priority: +// - upstream lease (authoritative when accessible) +// - downstream lease (fallback for cross-cluster/cross-account) +func evaluateClaim( + connectorUID types.UID, + existing *dnsv1alpha1.DNSRecordSet, + upstream upstreamLeaseStatus, + claimLease *coordinationv1.Lease, + now time.Time, +) claimStatus { + if existing == nil { + return claimStatusCreate + } + if existing.Labels[irohDNSClaimedByUIDLabel] == string(connectorUID) { + return claimStatusOwner + } + switch upstream { + case upstreamLeaseAlive: + return claimStatusDefer + case upstreamLeaseDead: + return claimStatusTakeOver + } + // Upstream inaccessible: fall back to downstream Lease. + if claimLease != nil && isLeaseExpired(*claimLease, now) { + return claimStatusTakeOver + } + return claimStatusDefer +} + // allowedIrohControllerNames is the set of ConnectorClass.spec.controllerName // values for which we publish iroh DNS records. Both names refer to the same // controller; "datum-connect" is the legacy name kept alive while older @@ -84,6 +153,7 @@ type IrohDNSReconciler struct { // +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors/status,verbs=update;patch // +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors/finalizers,verbs=update // +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectorclasses,verbs=get;list;watch +// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;create;update;patch;delete func (r *IrohDNSReconciler) Reconcile(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { logger := log.FromContext(ctx, "cluster", req.ClusterName) @@ -139,47 +209,130 @@ func (r *IrohDNSReconciler) Reconcile(ctx context.Context, req mcreconcile.Reque return ctrl.Result{}, r.applyClaim(ctx, cl, &connector, desired) } -// applyClaim implements the claim-then-write loop: -// - Get the DNSRecordSet at the deterministic z32-derived name. -// - Not found → Create with our claim. AlreadyExists means a sibling beat -// us; we re-fetch and continue. -// - Found, our claim → SSA refresh content. -// - Found, foreign claim → defer (no write) and surface a status -// condition naming the owner. +// applyClaim implements the ownership arbitration loop: +// - Not found → Create with our claim + create downstream Lease. +// - Found, our claim → SSA refresh + renew downstream Lease. +// - Found, foreign claim + owner confirmed dead → force-claim. +// - Found, foreign claim + owner alive or no signal → defer. +// +// An absent downstream Lease (upstream inaccessible) results in Defer, not +// TakeOver, so pre-protocol records are not immediately contested on upgrade. func (r *IrohDNSReconciler) applyClaim(ctx context.Context, cl cluster.Cluster, connector *networkingv1alpha1.Connector, desired *dnsv1alpha1.DNSRecordSet) error { key := client.ObjectKeyFromObject(desired) + now := time.Now() + var existing dnsv1alpha1.DNSRecordSet - err := r.Downstream.GetClient().Get(ctx, key, &existing) + existingErr := r.Downstream.GetClient().Get(ctx, key, &existing) + if existingErr != nil && !apierrors.IsNotFound(existingErr) { + return fmt.Errorf("get DNSRecordSet: %w", existingErr) + } + var existingPtr *dnsv1alpha1.DNSRecordSet + if existingErr == nil { + existingPtr = &existing + } + + var claimLease *coordinationv1.Lease + var upstream upstreamLeaseStatus + if existingPtr != nil && existingPtr.Labels[irohDNSClaimedByUIDLabel] != string(connector.UID) { + upstream = r.checkUpstreamClaimLease(ctx, existingPtr, now) + if upstream == upstreamLeaseUnknown { + // Upstream inaccessible — check downstream Lease as fallback. + var lease coordinationv1.Lease + err := r.Downstream.GetClient().Get(ctx, key, &lease) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("get downstream claim lease: %w", err) + } + if err == nil { + claimLease = &lease + } + } + } - switch { - case apierrors.IsNotFound(err): + switch evaluateClaim(connector.UID, existingPtr, upstream, claimLease, now) { + case claimStatusCreate: if err := r.Downstream.GetClient().Create(ctx, desired); err != nil { if !apierrors.IsAlreadyExists(err) { return fmt.Errorf("create DNSRecordSet: %w", err) } - // Sibling raced us. Refetch and fall through to the foreign-claim - // branch on next reconcile. + // Sibling raced us to the create; next reconcile will re-evaluate. return nil } + if err := r.renewDownstreamLease(ctx, connector, desired.Name); err != nil { + return err + } return r.setPublishedCondition(ctx, cl, connector, metav1.ConditionTrue, connectorReasonIrohOwner, "Owns iroh DNS record.") - case err != nil: - return fmt.Errorf("get DNSRecordSet: %w", err) - } + case claimStatusOwner, claimStatusTakeOver: + if err := r.Downstream.GetClient().Patch(ctx, desired, client.Apply, client.FieldOwner(irohDNSFieldManager), client.ForceOwnership); err != nil { + return fmt.Errorf("apply DNSRecordSet: %w", err) + } + if err := r.renewDownstreamLease(ctx, connector, desired.Name); err != nil { + return err + } + return r.setPublishedCondition(ctx, cl, connector, metav1.ConditionTrue, connectorReasonIrohOwner, "Owns iroh DNS record.") - currentClaim := existing.Labels[irohDNSClaimedByUIDLabel] - if currentClaim != string(connector.UID) { + default: // claimStatusDefer ownerCluster := decodeIrohClusterLabel(existing.Labels[irohDNSConnectorClusterLabel]) ownerRef := ownerCluster + "/" + existing.Labels[irohDNSConnectorNamespaceLabel] + "/" + existing.Labels[irohDNSConnectorNameLabel] return r.setPublishedCondition(ctx, cl, connector, metav1.ConditionFalse, connectorReasonIrohDeferredToOwner, - fmt.Sprintf("iroh DNS record is owned by Connector %s (uid %s).", ownerRef, currentClaim)) + fmt.Sprintf("iroh DNS record is owned by Connector %s (uid %s).", ownerRef, existing.Labels[irohDNSClaimedByUIDLabel])) + } +} + +// renewDownstreamLease creates or refreshes the claim Lease in the downstream +// cluster for the given DNSRecordSet name. The Lease is the liveness signal +// that lets competing connectors (including cross-cluster ones) determine +// whether the current claimant is still active. +func (r *IrohDNSReconciler) renewDownstreamLease(ctx context.Context, connector *networkingv1alpha1.Connector, recordSetName string) error { + dur := r.Config.Connector.LeaseDurationSeconds * irohDNSClaimLeaseHeartbeats + now := metav1.NewMicroTime(time.Now()) + holderIdentity := string(connector.UID) + lease := &coordinationv1.Lease{ + TypeMeta: metav1.TypeMeta{ + APIVersion: coordinationv1.SchemeGroupVersion.String(), + Kind: "Lease", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: recordSetName, + Namespace: r.Config.Connector.Iroh.DNSZoneRef.Namespace, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &holderIdentity, + LeaseDurationSeconds: &dur, + RenewTime: &now, + }, + } + if err := r.Downstream.GetClient().Patch(ctx, lease, client.Apply, client.FieldOwner(irohDNSFieldManager), client.ForceOwnership); err != nil { + return fmt.Errorf("renew downstream claim lease: %w", err) } + return nil +} - // We own it. SSA the desired content. - if err := r.Downstream.GetClient().Patch(ctx, desired, client.Apply, client.FieldOwner(irohDNSFieldManager), client.ForceOwnership); err != nil { - return fmt.Errorf("apply DNSRecordSet: %w", err) +// checkUpstreamClaimLease looks up the upstream Lease that connector_controller +// maintains for the foreign connector identified by existing's claim labels. +// Returns upstreamLeaseUnknown when the cluster is not registered with this +// manager (cross-account or otherwise inaccessible). +func (r *IrohDNSReconciler) checkUpstreamClaimLease(ctx context.Context, existing *dnsv1alpha1.DNSRecordSet, now time.Time) upstreamLeaseStatus { + clusterName := decodeIrohClusterLabel(existing.Labels[irohDNSConnectorClusterLabel]) + cl, err := r.mgr.GetCluster(ctx, clusterName) + if err != nil { + return upstreamLeaseUnknown + } + key := client.ObjectKey{ + Namespace: existing.Labels[irohDNSConnectorNamespaceLabel], + Name: existing.Labels[irohDNSConnectorNameLabel], + } + var lease coordinationv1.Lease + if err := cl.GetClient().Get(ctx, key, &lease); err != nil { + if apierrors.IsNotFound(err) { + return upstreamLeaseDead + } + return upstreamLeaseUnknown + } + if isLeaseExpired(lease, now) { + return upstreamLeaseDead } - return r.setPublishedCondition(ctx, cl, connector, metav1.ConditionTrue, connectorReasonIrohOwner, "Owns iroh DNS record.") + return upstreamLeaseAlive } // handleDeletion releases the claim (if held) and removes the finalizer. @@ -226,6 +379,15 @@ func (r *IrohDNSReconciler) releaseIfOwner(ctx context.Context, connector *netwo if err := r.Downstream.GetClient().Delete(ctx, &existing); err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("delete DNSRecordSet: %w", err) } + // Release the downstream claim Lease only if we currently hold it. + var claimLease coordinationv1.Lease + if err := r.Downstream.GetClient().Get(ctx, key, &claimLease); err == nil { + if claimLease.Spec.HolderIdentity != nil && *claimLease.Spec.HolderIdentity == string(connector.UID) { + if err := r.Downstream.GetClient().Delete(ctx, &claimLease); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("delete downstream claim lease: %w", err) + } + } + } return nil } diff --git a/internal/controller/iroh_dns_controller_test.go b/internal/controller/iroh_dns_controller_test.go index faded244..5c1c9909 100644 --- a/internal/controller/iroh_dns_controller_test.go +++ b/internal/controller/iroh_dns_controller_test.go @@ -3,10 +3,28 @@ package controller import ( + "context" + "fmt" "testing" + "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + coordinationv1 "k8s.io/api/coordination/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + k8sscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" dnsv1alpha1 "go.miloapis.com/dns-operator/api/v1alpha1" @@ -14,6 +32,16 @@ import ( "go.datum.net/network-services-operator/internal/config" ) +func newIrohTestScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + require.NoError(t, k8sscheme.AddToScheme(s)) + require.NoError(t, coordinationv1.AddToScheme(s)) + require.NoError(t, networkingv1alpha1.AddToScheme(s)) + require.NoError(t, dnsv1alpha1.AddToScheme(s)) + return s +} + // Real iroh public key from iroh-base/src/key.rs SecretKey.public, chosen // because it has a known z32 form we can pin against. const ( @@ -377,3 +405,239 @@ func TestSortIrohAddresses_DoesNotMutateInput(t *testing.T) { } } } + +type irohMultiClusterManager struct { + mcmanager.Manager + clusters map[string]client.Client +} + +func (m *irohMultiClusterManager) GetCluster(_ context.Context, name string) (cluster.Cluster, error) { + cl, ok := m.clusters[name] + if !ok { + return nil, fmt.Errorf("cluster %q not registered", name) + } + return &fakeCluster{cl: cl}, nil +} + +func TestIrohDNSReconcile_ClaimArbitration(t *testing.T) { + log.SetLogger(zap.New(zap.UseDevMode(true))) + + const ( + ourUID = "aaaaaaaa-0000-0000-0000-000000000001" + upstreamUID = "bbbbbbbb-0000-0000-0000-000000000002" + + dnsNamespace = "datum-dns" + upstreamCluster = "/foreign-project" + upstreamConnector = "foreign-connector" + upstreamNamespace = "default" + ) + + recordName := "iroh-" + testEndpointZ32 + + irohConnectorClass := &networkingv1alpha1.ConnectorClass{ + ObjectMeta: metav1.ObjectMeta{Name: "datum-connect"}, + Spec: networkingv1alpha1.ConnectorClassSpec{ControllerName: "networking.datumapis.com/datum-connect"}, + } + + makeConnector := func() *networkingv1alpha1.Connector { + return &networkingv1alpha1.Connector{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-connector", + Namespace: "default", + UID: ourUID, + Finalizers: []string{irohDNSFinalizer}, + }, + Spec: networkingv1alpha1.ConnectorSpec{ConnectorClassName: "datum-connect"}, + Status: networkingv1alpha1.ConnectorStatus{ + ConnectionDetails: &networkingv1alpha1.ConnectorConnectionDetails{ + Type: networkingv1alpha1.PublicKeyConnectorConnectionType, + PublicKey: &networkingv1alpha1.ConnectorConnectionDetailsPublicKey{ + Id: testEndpointHex, + HomeRelay: testRelayURL, + }, + }, + }, + } + } + + // makeExistingRecord returns a DNSRecordSet claimed by the foreign connector. + makeExistingRecord := func() *dnsv1alpha1.DNSRecordSet { + return &dnsv1alpha1.DNSRecordSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: dnsv1alpha1.GroupVersion.String(), + Kind: "DNSRecordSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: recordName, + Namespace: dnsNamespace, + Labels: map[string]string{ + irohDNSClaimedByUIDLabel: upstreamUID, + irohDNSConnectorClusterLabel: encodeIrohClusterLabel(upstreamCluster), + irohDNSConnectorNamespaceLabel: upstreamNamespace, + irohDNSConnectorNameLabel: upstreamConnector, + }, + }, + } + } + + dur30 := int32(30) + + makeUpstreamLease := func(renewedAgo time.Duration) *coordinationv1.Lease { + return &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{Name: upstreamConnector, Namespace: upstreamNamespace}, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: ptr.To(upstreamUID), + LeaseDurationSeconds: &dur30, + RenewTime: &metav1.MicroTime{Time: time.Now().Add(-renewedAgo)}, + }, + } + } + + makeDownstreamLease := func(renewedAgo time.Duration) *coordinationv1.Lease { + return &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{Name: recordName, Namespace: dnsNamespace}, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: ptr.To(upstreamUID), + LeaseDurationSeconds: &dur30, + RenewTime: &metav1.MicroTime{Time: time.Now().Add(-renewedAgo)}, + }, + } + } + + tests := []struct { + name string + existingRecord *dnsv1alpha1.DNSRecordSet + upstreamLease *coordinationv1.Lease // in the foreign cluster (nil = cluster inaccessible or no lease) + downstreamLease *coordinationv1.Lease // downstream claim Lease + foreignClusterKnown bool // whether the foreign cluster is registered in the manager + wantStatus metav1.ConditionStatus + wantReason string + wantOwnerUID string // expected irohDNSClaimedByUIDLabel after reconcile; "" = don't check + }{ + { + name: "no existing record — connector creates and owns it", + wantStatus: metav1.ConditionTrue, + wantReason: connectorReasonIrohOwner, + wantOwnerUID: ourUID, + }, + { + name: "same-operator owner connector was deleted", + existingRecord: makeExistingRecord(), + foreignClusterKnown: true, + wantStatus: metav1.ConditionTrue, + wantReason: connectorReasonIrohOwner, + wantOwnerUID: ourUID, + }, + { + name: "same-operator owner whose upstream Lease expired", + existingRecord: makeExistingRecord(), + foreignClusterKnown: true, + upstreamLease: makeUpstreamLease(5 * time.Minute), + wantStatus: metav1.ConditionTrue, + wantReason: connectorReasonIrohOwner, + wantOwnerUID: ourUID, + }, + { + name: "same-operator owner alive — upstream Lease active", + existingRecord: makeExistingRecord(), + foreignClusterKnown: true, + upstreamLease: makeUpstreamLease(0), + wantStatus: metav1.ConditionFalse, + wantReason: connectorReasonIrohDeferredToOwner, + }, + { + name: "same-operator owner — migration / upstream Lease absent", + existingRecord: makeExistingRecord(), + foreignClusterKnown: true, + wantStatus: metav1.ConditionTrue, + wantReason: connectorReasonIrohOwner, + wantOwnerUID: ourUID, + }, + { + name: "cross-account owner whose downstream Lease expired", + existingRecord: makeExistingRecord(), + downstreamLease: makeDownstreamLease(5 * time.Minute), + wantStatus: metav1.ConditionTrue, + wantReason: connectorReasonIrohOwner, + wantOwnerUID: ourUID, + }, + { + name: "cross-account owner alive — downstream Lease active", + existingRecord: makeExistingRecord(), + downstreamLease: makeDownstreamLease(0), + wantStatus: metav1.ConditionFalse, + wantReason: connectorReasonIrohDeferredToOwner, + }, + { + name: "cross-account pre-protocol record — no downstream Lease", + existingRecord: makeExistingRecord(), + wantStatus: metav1.ConditionFalse, + wantReason: connectorReasonIrohDeferredToOwner, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testScheme := newIrohTestScheme(t) + connector := makeConnector() + + upstreamCl := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(connector, irohConnectorClass). + WithStatusSubresource(connector). + Build() + + foreignClusterBuilder := fake.NewClientBuilder().WithScheme(testScheme) + if tt.upstreamLease != nil { + foreignClusterBuilder = foreignClusterBuilder.WithObjects(tt.upstreamLease) + } + foreignClusterCl := foreignClusterBuilder.Build() + + clusters := map[string]client.Client{testClusterName: upstreamCl} + if tt.foreignClusterKnown { + clusters[upstreamCluster] = foreignClusterCl + } + mgr := &irohMultiClusterManager{clusters: clusters} + + downstreamBuilder := fake.NewClientBuilder().WithScheme(testScheme) + if tt.existingRecord != nil { + downstreamBuilder = downstreamBuilder.WithObjects(tt.existingRecord) + } + if tt.downstreamLease != nil { + downstreamBuilder = downstreamBuilder.WithObjects(tt.downstreamLease) + } + downstreamCl := downstreamBuilder.Build() + + reconciler := &IrohDNSReconciler{ + mgr: mgr, + Downstream: &clusterWithClient{c: downstreamCl, scheme: testScheme}, + Config: newReconciler().Config, + } + + ctx := context.Background() + _, err := reconciler.Reconcile(ctx, mcreconcile.Request{ + ClusterName: testClusterName, + Request: reconcile.Request{NamespacedName: client.ObjectKeyFromObject(connector)}, + }) + require.NoError(t, err) + + var updated networkingv1alpha1.Connector + require.NoError(t, upstreamCl.Get(ctx, client.ObjectKeyFromObject(connector), &updated)) + + cond := apimeta.FindStatusCondition(updated.Status.Conditions, connectorConditionIrohDNSPublished) + require.NotNil(t, cond, "IrohDNSPublished condition must be set") + assert.Equal(t, tt.wantStatus, cond.Status) + assert.Equal(t, tt.wantReason, cond.Reason) + + if tt.wantOwnerUID != "" { + var record dnsv1alpha1.DNSRecordSet + require.NoError(t, downstreamCl.Get(ctx, client.ObjectKey{Namespace: dnsNamespace, Name: recordName}, &record)) + assert.Equal(t, tt.wantOwnerUID, record.Labels[irohDNSClaimedByUIDLabel], "DNSRecordSet claim label") + + var claimLease coordinationv1.Lease + require.NoError(t, downstreamCl.Get(ctx, client.ObjectKey{Namespace: dnsNamespace, Name: recordName}, &claimLease)) + assert.Equal(t, tt.wantOwnerUID, ptr.Deref(claimLease.Spec.HolderIdentity, ""), "claim Lease holder identity") + } + }) + } +}